Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration
Alan, Unfortunately, the underlying HDF5 library isn't thread-safe by default. It can be built in a thread-safe mode that serializes all API calls, but still doesn't allow actual parallel access to the disk. See [1] for more details. Here's [2] another interesting discussion concerning whether multithreaded access is actually beneficial for an I/O limited library like HDF5. Ultimately, if one thread can read at the disk's maximum transfer rate, then multiple threads don't provide any benefit. Beyond the limitations of HDF5, PyTables also maintains global state in various module-level variables. One example is the _open_file cache in the file.py module. I made an attempt in the past to work around this to allow read-only access from multiple threads, but didn't make much progress. In general, I think your best bet is to serialize all access through a single process. There is another example in the PyTables/examples directory that benchmarks different methods of transferring data from PyTables to another process [3]. It compares Python's multiprocessing.Queue, sockets, and memory-mapped files. In my testing, the latter two are 5-10x faster than using a queue. Another option would be to use multiple threads, but handle all access to the HDF5 file in one thread. PyTables will release the GIL when making HDF5 library calls, so the other threads will be able to run. You could use a Queue.Queue or some other mechanism to transfer data between threads. No actual copying would be needed since their memory is shared, which should make it faster than the multi-process techniques. Hope that helps. Josh Ayers [1]: http://www.hdfgroup.org/hdf5-quest.html#mthread [2]: https://visitbugs.ornl.gov/projects/8/wiki/Multi-threaded_cores_and_HPC-HDF5 [3]: https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_benchmarks.py On Wed, Dec 5, 2012 at 2:24 PM, Alan Marchiori wrote: > I am trying to allow multiple threads read/write access to pytables data > and found it is necessary to call flush() before any read. If not, the > latest data is not returned. However, this can cause a RuntimeError. I > have tried protecting pytables access with both locks and queues as done by > joshayers ( > https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py). > In either case I still get RuntimeError: dictionary changed size during > iteration when doing the flush. (incidentally using the Locks appears to > be much faster than using queues in my unscientific tests...) > > I have tried versions 2.4 and 2.3.1 with the same results. Interestingly > this only appears to happen if there are multiple tables/groups in the H5 > file. To investigate this behavior further I create a test program to > illustrate (below). When run with num_groups = 5 num_tables = 5 (or > greater) I see the runtime error every time. When these values are smaller > than this it doesn't (at least in a short test period). > > I might be doing something unexpected with pytables, but this seems pretty > straight forward to me. Any help is appreciated. > > > import tables > import threading > import random > import time > > lock = threading.Lock() > def synchronized(lock): > def wrap(f): > def newFunction(*args, **kw): > lock.acquire() > try: > return f(*args, **kw) > finally: > lock.release() > return newFunction > return wrap > > class TableValue(tables.IsDescription): > a = tables.Int64Col(pos=1) > b = tables.UInt32Col(pos=2) > > class Test(): > def __init__(self): > self.h5 = None > self.h5 = tables.openFile('/data/test.h5', mode='w') > self.num_groups = 5 > self.num_tables = 5 > > self.groups = [self.h5.createGroup('/', "group%d"%i) for i in > range(self.num_groups)] > self.tables = [] > for group in self.groups: > tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) > for i in range(self.num_tables)] > self.tables.append (tbls) > for table in tbls: > table.cols.a.createIndex() > > self.stats = {'read': 0, > 'write': 0} > > @synchronized(lock) > def __del__(self): > if self.h5 != None: > self.h5.close() > self.h5 = None > > @synchronized(lock) > def write(self): > x = self.tables[random.randint(0, > self.num_groups-1)][random.randint(0, self.num_tables-1)].row > x['a'] = random.randint(0, 100) > x['b'] = random.randint(0, 100) > x.append() > self.stats['write'] += 1 > > @synchronized(lock) > def read(self): > # flush so we can query latest data > self.h5.flush() > > table = self.tables[random.randint(0, > self.num_groups-1)][random.randint(0, self.num_tables-1)] > # do some query > results = table.readWhere(
Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration
Josh, Thanks for the detailed response. I would like to avoid going through a separate process if at all possible due to the performance penalty. I have also tried your last suggestion to create a dedicated pytables thread and send everything through that but still see the same problem (Runtime error in flush). This leads me to believe something strange is going on behind the scenes. ?? Updated test program with dedicated pytables thread reading an input Queue.Queue: import tables import threading import random import time import Queue # a simple table class TableValue(tables.IsDescription): a = tables.Int64Col(pos=1) b = tables.UInt32Col(pos=2) class TablesThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.name = 'HDF5 io thread' # create the dummy HDF5 file self.h5 = None self.h5 = tables.openFile('/data/test.h5', mode='w') self.num_groups = 5 self.num_tables = 5 self.groups = [self.h5.createGroup('/', "group%d"%i) for i in range(self.num_groups)] self.tables = [] for group in self.groups: tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for i in range(self.num_tables)] self.tables.append (tbls) for table in tbls: # add an index for good measure table.cols.a.createIndex() self.stopEvt = threading.Event() self.stoppedEvt = threading.Event() self.inputQ = Queue.Queue() def run(self): try: while not self.stopEvt.is_set(): # get a command try: cmd, args, result = self.inputQ.get(timeout = 0.5) except Queue.Empty: # poll stopEvt so we can shutdown continue # do the command if cmd == 'write': x = self.tables[args[0]][args[1]].row x['a'] = args[2] x['b'] = args[3] x.append() elif cmd == 'read': self.h5.flush() table = self.tables[args[0]][args[1]] result.value = table.readWhere('a > %d'%(args[2])) else: raise Exception("Command not supported: %s"%(cmd,)) # signal that the result is ready result.event.set() finally: # shutdown self.h5.close() self.stoppedEvt.set() def stop(self): if not self.stoppedEvt.is_set(): self.stopEvt.set() self.stoppedEvt.wait() class ResultEvent(): def __init__(self): self.event = threading.Event() self.value = None class Test(): def __init__(self): self.tables = TablesThread() self.tables.start() self.timeout = 5 self.stats = {'read': 0, 'write': 0, 'read_error': 0, 'write_error': 0} def write(self): r = ResultEvent() self.tables.inputQ.put(('write', (random.randint(0, self.tables.num_groups-1), random.randint(0, self.tables.num_tables-1), random.randint(0, 100), random.randint(0, 100)), r)) r.event.wait(timeout = self.timeout) if r.event.is_set(): self.stats['write'] += 1 else: self.stats['write_error'] += 1 def read(self): r = ResultEvent() self.tables.inputQ.put(('read', (random.randint(0, self.tables.num_groups-1), random.randint(0, self.tables.num_tables-1), random.randint(0, 100)), r)) r.event.wait(timeout = self.timeout) if r.event.is_set(): self.stats['read'] += 1 #print 'Query got %d hits'%(len(r.value)) else: self.stats['read_error'] += 1 def close(self): self.tables.stop() def __del__(self): self.close() class Worker(threading.Thread): def __init__(self, method): threading.Thread.__init__(self) self.method = method self.stopEvt = threading.Event() def run(self): while not self.stopEvt.is_set(): try: self.method() except Exception, x: print 'Worker thread failed with: %s'%(x,) time.sleep(random.random()/100.0) def stop(self): self.stopEvt.set() def main(): t = Test() threads = [Worker(t.write) for _i in range(10)] threads.extend([Worker(t.read) for _i in range(10)]) for thread in threads: thread.start() time.sleep(5) for thread in threads:
Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration
I'm continuing to fight this error. As a sanity check I rewrote my sample app as a single thread only. With interleaved read/writes to multiple tables I still get "RuntimeError: dictionary changed size during iteration" in flush. I still think there is some underlying problem or something I don't understand about pytables/hdf5. I'm far from an expert on either of these so I appreciate any suggestions or even confirmation that I'm not completely crazy? The following code should work, right? import tables import random import datetime # a simple table class TableValue(tables.IsDescription): a = tables.Int64Col(pos=1) b = tables.UInt32Col(pos=2) class Test(): def __init__(self): self.stats = {'read': 0, 'write': 0, 'read_error': 0, 'write_error': 0} self.h5 = None self.h5 = tables.openFile('/data/test.h5', mode='w') self.num_groups = 5 self.num_tables = 5 # create num_groups self.groups = [self.h5.createGroup('/', "group%d"%i) for i in range(self.num_groups)] self.tables = [] # create num_tables in each group we just created for group in self.groups: tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for i in range(self.num_tables)] self.tables.append (tbls) for table in tbls: # add an index for good measure table.cols.a.createIndex() def write(self): # select a random table and write to it x = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)].row x['a'] = random.randint(0, 100) x['b'] = random.randint(0, 100) x.append() self.stats['write'] += 1 def read(self): # first flush any cached data self.h5.flush() # then select a random table table = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)] # and do some random query table.readWhere('a > %d'%(random.randint(0, 100))) self.stats['read'] += 1 def close(self): self.h5.close() def main(): t = Test() start = datetime.datetime.now() # run for 10 seconds while (datetime.datetime.now() - start < datetime.timedelta(seconds=10)): # randomly do a read or a write if random.random() > 0.5: t.write() else: t.read() print t.stats print "Done" t.close() if __name__ == "__main__": main() On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori wrote: > Josh, > > Thanks for the detailed response. I would like to avoid going through a > separate process if at all possible due to the performance penalty. I have > also tried your last suggestion to create a dedicated pytables thread and > send everything through that but still see the same problem (Runtime error > in flush). This leads me to believe something strange is going on behind > the scenes. ?? > > Updated test program with dedicated pytables thread reading an input > Queue.Queue: > > import tables > import threading > import random > import time > import Queue > > # a simple table > class TableValue(tables.IsDescription): > a = tables.Int64Col(pos=1) > b = tables.UInt32Col(pos=2) > > class TablesThread(threading.Thread): > def __init__(self): > threading.Thread.__init__(self) > self.name = 'HDF5 io thread' > # create the dummy HDF5 file > self.h5 = None > self.h5 = tables.openFile('/data/test.h5', mode='w') > self.num_groups = 5 > self.num_tables = 5 > self.groups = [self.h5.createGroup('/', "group%d"%i) for i in > range(self.num_groups)] > self.tables = [] > for group in self.groups: > tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) > for i in range(self.num_tables)] > self.tables.append (tbls) > for table in tbls: > # add an index for good measure > table.cols.a.createIndex() > self.stopEvt = threading.Event() > self.stoppedEvt = threading.Event() > self.inputQ = Queue.Queue() > > def run(self): > try: > while not self.stopEvt.is_set(): > # get a command > try: > cmd, args, result = self.inputQ.get(timeout = 0.5) > except Queue.Empty: > # poll stopEvt so we can shutdown > continue > > # do the command > if cmd == 'write': > x = self.tables[args[0]][args[1]].row > x['a'] = args[2] > x['b'] = args[3] > x.append() > elif cmd == 'read': > self.h5.flush() > table = self.tables[args[0]][args[1]] > r
Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration
I think I have found a viable work around. Previously, I was flushing the whole HDF5 file: self.h5.flush() By replacing this with a flush on just the table of interest: table = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)] table.flush() The RuntimeError seems to be gone in all versions of my test program (both single threaded and threaded with locks). Hope this helps someone else and eventually maybe someone will figure out what is wrong with File.flush(). On Mon, Dec 10, 2012 at 10:52 AM, Alan Marchiori wrote: > I'm continuing to fight this error. As a sanity check I rewrote my sample > app as a single thread only. With interleaved read/writes to multiple > tables I still get "RuntimeError: dictionary changed size during iteration" > in flush. I still think there is some underlying problem or something I > don't understand about pytables/hdf5. I'm far from an expert on either of > these so I appreciate any suggestions or even confirmation that I'm not > completely crazy? The following code should work, right? > > import tables > import random > import datetime > > # a simple table > class TableValue(tables.IsDescription): > a = tables.Int64Col(pos=1) > b = tables.UInt32Col(pos=2) > > class Test(): > def __init__(self): > self.stats = {'read': 0, > 'write': 0, > 'read_error': 0, > 'write_error': 0} > self.h5 = None > self.h5 = tables.openFile('/data/test.h5', mode='w') > self.num_groups = 5 > self.num_tables = 5 > # create num_groups > self.groups = [self.h5.createGroup('/', "group%d"%i) for i in > range(self.num_groups)] > self.tables = [] > # create num_tables in each group we just created > for group in self.groups: > tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) > for i in range(self.num_tables)] > self.tables.append (tbls) > for table in tbls: > # add an index for good measure > table.cols.a.createIndex() > > def write(self): > # select a random table and write to it > x = self.tables[random.randint(0, > self.num_groups-1)][random.randint(0, self.num_tables-1)].row > x['a'] = random.randint(0, 100) > x['b'] = random.randint(0, 100) > x.append() > self.stats['write'] += 1 > > def read(self): > # first flush any cached data > self.h5.flush() > # then select a random table > table = self.tables[random.randint(0, > self.num_groups-1)][random.randint(0, self.num_tables-1)] > # and do some random query > table.readWhere('a > %d'%(random.randint(0, 100))) > self.stats['read'] += 1 > > def close(self): > self.h5.close() > > def main(): > t = Test() > > start = datetime.datetime.now() > > # run for 10 seconds > while (datetime.datetime.now() - start < > datetime.timedelta(seconds=10)): > # randomly do a read or a write > if random.random() > 0.5: > t.write() > else: > t.read() > > print t.stats > print "Done" > t.close() > > if __name__ == "__main__": > main() > > > On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori wrote: > >> Josh, >> >> Thanks for the detailed response. I would like to avoid going through a >> separate process if at all possible due to the performance penalty. I have >> also tried your last suggestion to create a dedicated pytables thread and >> send everything through that but still see the same problem (Runtime error >> in flush). This leads me to believe something strange is going on behind >> the scenes. ?? >> >> Updated test program with dedicated pytables thread reading an input >> Queue.Queue: >> >> import tables >> import threading >> import random >> import time >> import Queue >> >> # a simple table >> class TableValue(tables.IsDescription): >> a = tables.Int64Col(pos=1) >> b = tables.UInt32Col(pos=2) >> >> class TablesThread(threading.Thread): >> def __init__(self): >> threading.Thread.__init__(self) >> self.name = 'HDF5 io thread' >> # create the dummy HDF5 file >> self.h5 = None >> self.h5 = tables.openFile('/data/test.h5', mode='w') >> self.num_groups = 5 >> self.num_tables = 5 >> self.groups = [self.h5.createGroup('/', "group%d"%i) for i in >> range(self.num_groups)] >> self.tables = [] >> for group in self.groups: >> tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) >> for i in range(self.num_tables)] >> self.tables.append (tbls) >> for table in tbls: >> # add an index for good measure >> table.cols.a.createIndex() >> self.stopEvt = threading.Event() >> self.stoppedEvt = threading.Event() >> self.
Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration
Alan, I haven't found the exact problem, but seems to have something to do with the node cache. Changing the 'NODE_CACHE_SLOTS' parameter to zero (which disables the node cache) or to a negative number (which allows the cache to grow without limit) also eliminates the problem, at least in the unthreaded version of your code. It can be set permanently in the parameters.py file, or passed as a parameter to the tables.openFile function. I'll open an issue on github about this problem. Thanks, Josh On Mon, Dec 10, 2012 at 10:05 AM, Alan Marchiori wrote: > I think I have found a viable work around. > Previously, I was flushing the whole HDF5 file: > self.h5.flush() > > By replacing this with a flush on just the table of interest: > table = self.tables[random.randint(0, > self.num_groups-1)][random.randint(0, self.num_tables-1)] > table.flush() > > The RuntimeError seems to be gone in all versions of my test program (both > single threaded and threaded with locks). Hope this helps someone else and > eventually maybe someone will figure out what is wrong with File.flush(). > > > On Mon, Dec 10, 2012 at 10:52 AM, Alan Marchiori wrote: > >> I'm continuing to fight this error. As a sanity check I rewrote my >> sample app as a single thread only. With interleaved read/writes to >> multiple tables I still get "RuntimeError: dictionary changed size during >> iteration" in flush. I still think there is some underlying problem or >> something I don't understand about pytables/hdf5. I'm far from an expert >> on either of these so I appreciate any suggestions or even confirmation >> that I'm not completely crazy? The following code should work, right? >> >> import tables >> import random >> import datetime >> >> # a simple table >> class TableValue(tables.IsDescription): >> a = tables.Int64Col(pos=1) >> b = tables.UInt32Col(pos=2) >> >> class Test(): >> def __init__(self): >> self.stats = {'read': 0, >> 'write': 0, >> 'read_error': 0, >> 'write_error': 0} >> self.h5 = None >> self.h5 = tables.openFile('/data/test.h5', mode='w') >> self.num_groups = 5 >> self.num_tables = 5 >> # create num_groups >> self.groups = [self.h5.createGroup('/', "group%d"%i) for i in >> range(self.num_groups)] >> self.tables = [] >> # create num_tables in each group we just created >> for group in self.groups: >> tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) >> for i in range(self.num_tables)] >> self.tables.append (tbls) >> for table in tbls: >> # add an index for good measure >> table.cols.a.createIndex() >> >> def write(self): >> # select a random table and write to it >> x = self.tables[random.randint(0, >> self.num_groups-1)][random.randint(0, self.num_tables-1)].row >> x['a'] = random.randint(0, 100) >> x['b'] = random.randint(0, 100) >> x.append() >> self.stats['write'] += 1 >> >> def read(self): >> # first flush any cached data >> self.h5.flush() >> # then select a random table >> table = self.tables[random.randint(0, >> self.num_groups-1)][random.randint(0, self.num_tables-1)] >> # and do some random query >> table.readWhere('a > %d'%(random.randint(0, 100))) >> self.stats['read'] += 1 >> >> def close(self): >> self.h5.close() >> >> def main(): >> t = Test() >> >> start = datetime.datetime.now() >> >> # run for 10 seconds >> while (datetime.datetime.now() - start < >> datetime.timedelta(seconds=10)): >> # randomly do a read or a write >> if random.random() > 0.5: >> t.write() >> else: >> t.read() >> >> print t.stats >> print "Done" >> t.close() >> >> if __name__ == "__main__": >> main() >> >> >> On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori wrote: >> >>> Josh, >>> >>> Thanks for the detailed response. I would like to avoid going through a >>> separate process if at all possible due to the performance penalty. I have >>> also tried your last suggestion to create a dedicated pytables thread and >>> send everything through that but still see the same problem (Runtime error >>> in flush). This leads me to believe something strange is going on behind >>> the scenes. ?? >>> >>> Updated test program with dedicated pytables thread reading an input >>> Queue.Queue: >>> >>> import tables >>> import threading >>> import random >>> import time >>> import Queue >>> >>> # a simple table >>> class TableValue(tables.IsDescription): >>> a = tables.Int64Col(pos=1) >>> b = tables.UInt32Col(pos=2) >>> >>> class TablesThread(threading.Thread): >>> def __init__(self): >>> threading.Thread.__init__(self) >>> self.name = 'HDF5 io thread' >>> # create the dummy HDF5 file