I'm continuing to fight this error. As a sanity check I rewrote my sample
app as a single thread only. With interleaved read/writes to multiple
tables I still get "RuntimeError: dictionary changed size during iteration"
in flush. I still think there is some underlying problem or something I
don't understand about pytables/hdf5. I'm far from an expert on either of
these so I appreciate any suggestions or even confirmation that I'm not
completely crazy? The following code should work, right?
import tables
import random
import datetime
# a simple table
class TableValue(tables.IsDescription):
a = tables.Int64Col(pos=1)
b = tables.UInt32Col(pos=2)
class Test():
def __init__(self):
self.stats = {'read': 0,
'write': 0,
'read_error': 0,
'write_error': 0}
self.h5 = None
self.h5 = tables.openFile('/data/test.h5', mode='w')
self.num_groups = 5
self.num_tables = 5
# create num_groups
self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
range(self.num_groups)]
self.tables = []
# create num_tables in each group we just created
for group in self.groups:
tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
self.tables.append (tbls)
for table in tbls:
# add an index for good measure
table.cols.a.createIndex()
def write(self):
# select a random table and write to it
x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
x['a'] = random.randint(0, 100)
x['b'] = random.randint(0, 100)
x.append()
self.stats['write'] += 1
def read(self):
# first flush any cached data
self.h5.flush()
# then select a random table
table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
# and do some random query
table.readWhere('a > %d'%(random.randint(0, 100)))
self.stats['read'] += 1
def close(self):
self.h5.close()
def main():
t = Test()
start = datetime.datetime.now()
# run for 10 seconds
while (datetime.datetime.now() - start <
datetime.timedelta(seconds=10)):
# randomly do a read or a write
if random.random() > 0.5:
t.write()
else:
t.read()
print t.stats
print "Done"
t.close()
if __name__ == "__main__":
main()
On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori <a...@alanmarian.com> wrote:
> Josh,
>
> Thanks for the detailed response. I would like to avoid going through a
> separate process if at all possible due to the performance penalty. I have
> also tried your last suggestion to create a dedicated pytables thread and
> send everything through that but still see the same problem (Runtime error
> in flush). This leads me to believe something strange is going on behind
> the scenes. ??
>
> Updated test program with dedicated pytables thread reading an input
> Queue.Queue:
>
> import tables
> import threading
> import random
> import time
> import Queue
>
> # a simple table
> class TableValue(tables.IsDescription):
> a = tables.Int64Col(pos=1)
> b = tables.UInt32Col(pos=2)
>
> class TablesThread(threading.Thread):
> def __init__(self):
> threading.Thread.__init__(self)
> self.name = 'HDF5 io thread'
> # create the dummy HDF5 file
> self.h5 = None
> self.h5 = tables.openFile('/data/test.h5', mode='w')
> self.num_groups = 5
> self.num_tables = 5
> self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
> range(self.num_groups)]
> self.tables = []
> for group in self.groups:
> tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
> for i in range(self.num_tables)]
> self.tables.append (tbls)
> for table in tbls:
> # add an index for good measure
> table.cols.a.createIndex()
> self.stopEvt = threading.Event()
> self.stoppedEvt = threading.Event()
> self.inputQ = Queue.Queue()
>
> def run(self):
> try:
> while not self.stopEvt.is_set():
> # get a command
> try:
> cmd, args, result = self.inputQ.get(timeout = 0.5)
> except Queue.Empty:
> # poll stopEvt so we can shutdown
> continue
>
> # do the command
> if cmd == 'write':
> x = self.tables[args[0]][args[1]].row
> x['a'] = args[2]
> x['b'] = args[3]
> x.append()
> elif cmd == 'read':
> self.h5.flush()
> table = self.tables[args[0]][args[1]]
> result.value = table.readWhere('a > %d'%(args[2]))
> else:
> raise Exception("Command not supported: %s"%(cmd,))
>
> # signal that the result is ready
> result.event.set()
>
> finally:
> # shutdown
> self.h5.close()
> self.stoppedEvt.set()
>
> def stop(self):
> if not self.stoppedEvt.is_set():
> self.stopEvt.set()
> self.stoppedEvt.wait()
>
> class ResultEvent():
> def __init__(self):
> self.event = threading.Event()
> self.value = None
>
> class Test():
> def __init__(self):
> self.tables = TablesThread()
> self.tables.start()
> self.timeout = 5
> self.stats = {'read': 0,
> 'write': 0,
> 'read_error': 0,
> 'write_error': 0}
>
> def write(self):
> r = ResultEvent()
> self.tables.inputQ.put(('write',
> (random.randint(0,
> self.tables.num_groups-1),
> random.randint(0,
> self.tables.num_tables-1),
> random.randint(0, 100),
> random.randint(0, 100)),
> r))
> r.event.wait(timeout = self.timeout)
> if r.event.is_set():
> self.stats['write'] += 1
> else:
> self.stats['write_error'] += 1
>
> def read(self):
> r = ResultEvent()
> self.tables.inputQ.put(('read',
> (random.randint(0,
> self.tables.num_groups-1),
> random.randint(0,
> self.tables.num_tables-1),
> random.randint(0, 100)),
> r))
> r.event.wait(timeout = self.timeout)
> if r.event.is_set():
> self.stats['read'] += 1
> #print 'Query got %d hits'%(len(r.value))
> else:
> self.stats['read_error'] += 1
>
>
> def close(self):
> self.tables.stop()
>
> def __del__(self):
> self.close()
>
> class Worker(threading.Thread):
> def __init__(self, method):
> threading.Thread.__init__(self)
> self.method = method
> self.stopEvt = threading.Event()
>
> def run(self):
> while not self.stopEvt.is_set():
> try:
> self.method()
> except Exception, x:
> print 'Worker thread failed with: %s'%(x,)
> time.sleep(random.random()/100.0)
>
> def stop(self):
> self.stopEvt.set()
>
> def main():
> t = Test()
>
> threads = [Worker(t.write) for _i in range(10)]
> threads.extend([Worker(t.read) for _i in range(10)])
>
> for thread in threads:
> thread.start()
>
> time.sleep(5)
>
> for thread in threads:
> thread.stop()
>
> for thread in threads:
> thread.join()
>
> t.close()
>
> print t.stats
>
> if __name__ == "__main__":
> main()
>
>
> On Wed, Dec 5, 2012 at 10:52 PM, Josh Ayers <josh.ay...@gmail.com> wrote:
>
>> Alan,
>>
>> Unfortunately, the underlying HDF5 library isn't thread-safe by default.
>> It can be built in a thread-safe mode that serializes all API calls, but
>> still doesn't allow actual parallel access to the disk. See [1] for more
>> details. Here's [2] another interesting discussion concerning whether
>> multithreaded access is actually beneficial for an I/O limited library like
>> HDF5. Ultimately, if one thread can read at the disk's maximum transfer
>> rate, then multiple threads don't provide any benefit.
>>
>> Beyond the limitations of HDF5, PyTables also maintains global state in
>> various module-level variables. One example is the _open_file cache in the
>> file.py module. I made an attempt in the past to work around this to allow
>> read-only access from multiple threads, but didn't make much progress.
>>
>> In general, I think your best bet is to serialize all access through a
>> single process. There is another example in the PyTables/examples
>> directory that benchmarks different methods of transferring data from
>> PyTables to another process [3]. It compares Python's
>> multiprocessing.Queue, sockets, and memory-mapped files. In my testing,
>> the latter two are 5-10x faster than using a queue.
>>
>> Another option would be to use multiple threads, but handle all access to
>> the HDF5 file in one thread. PyTables will release the GIL when making
>> HDF5 library calls, so the other threads will be able to run. You could
>> use a Queue.Queue or some other mechanism to transfer data between
>> threads. No actual copying would be needed since their memory is shared,
>> which should make it faster than the multi-process techniques.
>>
>> Hope that helps.
>>
>> Josh Ayers
>>
>>
>> [1]: http://www.hdfgroup.org/hdf5-quest.html#mthread
>>
>> [2]:
>> https://visitbugs.ornl.gov/projects/8/wiki/Multi-threaded_cores_and_HPC-HDF5
>>
>> [3]:
>> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_benchmarks.py
>>
>>
>> On Wed, Dec 5, 2012 at 2:24 PM, Alan Marchiori <a...@alanmarian.com>wrote:
>>
>>> I am trying to allow multiple threads read/write access to pytables data
>>> and found it is necessary to call flush() before any read. If not, the
>>> latest data is not returned. However, this can cause a RuntimeError. I
>>> have tried protecting pytables access with both locks and queues as done by
>>> joshayers (
>>> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
>>> In either case I still get RuntimeError: dictionary changed size during
>>> iteration when doing the flush. (incidentally using the Locks appears to
>>> be much faster than using queues in my unscientific tests...)
>>>
>>> I have tried versions 2.4 and 2.3.1 with the same results.
>>> Interestingly this only appears to happen if there are multiple
>>> tables/groups in the H5 file. To investigate this behavior further I
>>> create a test program to illustrate (below). When run with num_groups =
>>> 5 num_tables = 5 (or greater) I see the runtime error every time. When
>>> these values are smaller than this it doesn't (at least in a short test
>>> period).
>>>
>>> I might be doing something unexpected with pytables, but this seems
>>> pretty straight forward to me. Any help is appreciated.
>>>
>>>
>>>
------------------------------------------------------------------------------
LogMeIn Rescue: Anywhere, Anytime Remote support for IT. Free Trial
Remotely access PCs and mobile devices and provide instant support
Improve your efficiency, and focus on delivering more value-add services
Discover what IT Professionals Know. Rescue delivers
http://p.sf.net/sfu/logmein_12329d2d
_______________________________________________
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users