7 messages in com.googlegroups.boto-usersRe: SimpleDB concurrency
FromSent OnAttachments
Herb...@gmail.comJan 16, 2008 11:33 pm 
Herb...@gmail.comJan 17, 2008 3:07 pm 
Herb...@gmail.comJan 17, 2008 4:37 pm 
Herb...@gmail.comJan 17, 2008 10:38 pm 
Herb...@gmail.comJan 18, 2008 12:52 pm 
mitchJan 21, 2008 2:36 pm 
mitchJan 21, 2008 4:42 pm 
Actions with this message:
Paste this link in email or IM:
Paste this link in email or IM:
Atom feed for this thread
Paste this URL into your reader:
Subject:Re: SimpleDB concurrencyActions...
From:mitch (Mitc@gmail.com)
Date:Jan 21, 2008 2:36:14 pm
List:com.googlegroups.boto-users

Here's a quick little hack that seems to work:

import boto import threading

class SimpleThread(threading.Thread):

def __init__(self, name, domain_name, item_names): threading.Thread.__init__(self, name=name) print 'SimpleThread: %s %s' % (name, item_names) self.domain_name = domain_name self.conn = boto.connect_sdb() self.item_names = item_names self.items = []

def run(self): for item_name in self.item_names: item = self.conn.get_attributes(self.domain_name, item_name) self.items.append(item) print 'got %s' % item.name

def get_attrs(domain_name, query, num_threads): item_names = [] conn = boto.connect_sdb() domain = conn.get_domain(domain_name) rs = domain.query(query) for item in rs: item_names.append(item.name) print item_names print len(item_names) items = [] threads = [] n = len(item_names) / num_threads for i in range(0, num_threads+1): thread = SimpleThread('Thread-%d' % i, domain_name, item_names[n*i:n*(i+1)]) threads.append(thread) thread.start() for thread in threads: thread.join() items = items + thread.items return items

So, you call the get_attr function with the domain_name, a query that you want to perform against that domain (could be empty string if you want all items in the domain) and the number of threads you want to start. It then collects all of the item names and fires up the required number of threads, passing each thread a list of item names it needs to retrieve. Each thread then creates it's own connection to SDB and gets the items associated with the item_names. Those are collected in a local list and when the threads are joined, each threads local list is concatenated to form a list of all items.

This code is very rough and will probably break in a number of interesting ways but for my simple tests it does work and, by golly, it's pretty darn fast!

On Jan 18, 3:53 pm, "Herb@gmail.com" <Herb@googlemail.com> wrote:

Hi Sorin & List!

I implemented it thread safely now (I believe). Although I'm not using the queue, I'm waiting for all threads to be finished before I put them in a list.

Unfortunately the script still hangs after getting the first item.

http://rafb.net/p/t3u3gi26.html

thanks, Herb

On Jan 17, 11:48 pm, "Sorin Gherman" <sor@gmail.com> wrote:

Herb, I would implement Robert's suggestion: have SimpleThread expose the result (item) through a get_item, and after all joins have your main thread call get_item on each thread, and add each item to the global list in a sequential manner, to eliminate any race condition on that global list. This should work just fine, unless you need each thread's result as soon as they are ready or so, which doesn't seem to be the case here.

Sorin

On Jan 17, 2008 10:38 PM, Herb@gmail.com <Herb@googlemail.com> wrote:

Well, thanks to everyone's help I was actually able to implement concurrency using the threading module. Thanks Mitchell, Sorin and Robert.

Trouble is, it's a bit unreliable. Sometimes the script hangs, sometimes it just returns two results, sometimes just one. I'll post it in it's entirety, so anyone who wants can work off this script. If anyone implements this reliably, please lmk.

<code> from time import gmtime, strftime, sleep from boto.sdb.connection import SDBConnection import threading import time import random

# Start time print 'Start %s' % strftime("%H:%M:%S ", gmtime()) print

# Connect to amazon database conn = SDBConnection() items = conn.get_domain('items')

# Results list results = []

class SimpleThread(threading.Thread): def __init__(self, itemname): '''Initialize thread by setting the itemname for the aws sdb object we want to get''' threading.Thread.__init__(self, name=itemname) self.itemname = itemname def run(self): '''Get item helper''' global items, results # Get item from database item = items.get_item(self.itemname) print 'got %s' % str(item) # Append item to results results.append(item) time.sleep(random.random())

# Get all items threads = [] for itemname in items: thread = SimpleThread(itemname) thread.start() threads.append(thread) # Now we wait for them to finish. for thread in threads: thread.join() print "All threads finished"

# End time print print 'End %s' % strftime("%H:%M:%S ", gmtime()) </code>

On Jan 17, 6:14 pm, "Robert Brewer" <rwb@gmail.com> wrote:

Herb,

Some ideas: Consider the threading module instead of thread. threading has the join() function Sorin mentions. Consider the thread-safe Queue module instead of a list for your results. You are not appending items onto the list in a strictly thread-safe manner. If you use the threading module, another way to get the result is to store it as an object member (self.result?) which the main thread gets after it joins with each worker.

You may need one SDBConnection per thread. I don't know.

-Rob

On Jan 17, 2008 7:37 PM, Herb@gmail.com <Herb@googlemail.com> wrote:

Hi Mitchell & the list,

I need a hand with threads, I think I implemented it 90% of the way, but something's not quite there yet.

When I run my code, it just prints out the first item, and then hangs.

Can someone take a quick glance at this, and see what I could be doing wrong?

Thanks so much, appreciated, Herb