I'm continuing to fight this error.  As a sanity check I rewrote my sample
app as a single thread only.  With interleaved read/writes to multiple
tables I still get "RuntimeError: dictionary changed size during iteration"
in flush.  I still think there is some underlying problem or something I
don't understand about pytables/hdf5.  I'm far from an expert on either of
these so I appreciate any suggestions or even confirmation that I'm not
completely crazy?  The following code should work, right?

import tables
import random
import datetime

# a simple table
class TableValue(tables.IsDescription):
    a = tables.Int64Col(pos=1)
    b = tables.UInt32Col(pos=2)

class Test():
    def __init__(self):
        self.stats = {'read': 0,
                      'write': 0,
                      'read_error': 0,
                      'write_error': 0}
        self.h5 = None
        self.h5 = tables.openFile('/data/test.h5', mode='w')
        self.num_groups = 5
        self.num_tables = 5
        # create num_groups
        self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
range(self.num_groups)]
        self.tables = []
        # create num_tables in each group we just created
        for group in self.groups:
            tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
            self.tables.append (tbls)
            for table in tbls:
                # add an index for good measure
                table.cols.a.createIndex()

    def write(self):
        # select a random table and write to it
        x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
        x['a'] = random.randint(0, 100)
        x['b'] = random.randint(0, 100)
        x.append()
        self.stats['write'] += 1

    def read(self):
        # first flush any cached data
        self.h5.flush()
        # then select a random table
        table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
        # and do some random query
        table.readWhere('a > %d'%(random.randint(0, 100)))
        self.stats['read'] += 1

    def close(self):
        self.h5.close()

def main():
    t = Test()

    start = datetime.datetime.now()

    # run for 10 seconds
    while (datetime.datetime.now() - start <
datetime.timedelta(seconds=10)):
        # randomly do a read or a write
        if random.random() > 0.5:
            t.write()
        else:
            t.read()

    print t.stats
    print "Done"
    t.close()

if __name__ == "__main__":
    main()


On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori <a...@alanmarian.com> wrote:

> Josh,
>
> Thanks for the detailed response.  I would like to avoid going through a
> separate process if at all possible due to the performance penalty.  I have
> also tried your last suggestion to create a dedicated pytables thread and
> send everything through that but still see the same problem (Runtime error
> in flush).  This leads me to believe something strange is going on behind
> the scenes.  ??
>
> Updated test program with dedicated pytables thread reading an input
> Queue.Queue:
>
> import tables
> import threading
> import random
> import time
> import Queue
>
> # a simple table
> class TableValue(tables.IsDescription):
>     a = tables.Int64Col(pos=1)
>     b = tables.UInt32Col(pos=2)
>
> class TablesThread(threading.Thread):
>     def __init__(self):
>         threading.Thread.__init__(self)
>         self.name = 'HDF5 io thread'
>         # create the dummy HDF5 file
>         self.h5 = None
>         self.h5 = tables.openFile('/data/test.h5', mode='w')
>         self.num_groups = 5
>         self.num_tables = 5
>         self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
> range(self.num_groups)]
>         self.tables = []
>         for group in self.groups:
>             tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
> for i in range(self.num_tables)]
>             self.tables.append (tbls)
>             for table in tbls:
>                 # add an index for good measure
>                 table.cols.a.createIndex()
>         self.stopEvt = threading.Event()
>         self.stoppedEvt = threading.Event()
>         self.inputQ = Queue.Queue()
>
>     def run(self):
>         try:
>             while not self.stopEvt.is_set():
>                 # get a command
>                 try:
>                     cmd, args, result = self.inputQ.get(timeout = 0.5)
>                 except Queue.Empty:
>                     # poll stopEvt so we can shutdown
>                     continue
>
>                 # do the command
>                 if cmd == 'write':
>                     x = self.tables[args[0]][args[1]].row
>                     x['a'] = args[2]
>                     x['b'] = args[3]
>                     x.append()
>                 elif cmd == 'read':
>                     self.h5.flush()
>                     table = self.tables[args[0]][args[1]]
>                     result.value = table.readWhere('a > %d'%(args[2]))
>                 else:
>                     raise Exception("Command not supported: %s"%(cmd,))
>
>                 # signal that the result is ready
>                 result.event.set()
>
>         finally:
>             # shutdown
>             self.h5.close()
>             self.stoppedEvt.set()
>
>     def stop(self):
>         if not self.stoppedEvt.is_set():
>             self.stopEvt.set()
>             self.stoppedEvt.wait()
>
> class ResultEvent():
>     def __init__(self):
>         self.event = threading.Event()
>         self.value = None
>
> class Test():
>     def __init__(self):
>         self.tables = TablesThread()
>         self.tables.start()
>         self.timeout = 5
>         self.stats = {'read': 0,
>                       'write': 0,
>                       'read_error': 0,
>                       'write_error': 0}
>
>     def write(self):
>         r = ResultEvent()
>         self.tables.inputQ.put(('write',
>                                 (random.randint(0,
> self.tables.num_groups-1),
>                                  random.randint(0,
> self.tables.num_tables-1),
>                                  random.randint(0, 100),
>                                  random.randint(0, 100)),
>                                 r))
>         r.event.wait(timeout = self.timeout)
>         if r.event.is_set():
>             self.stats['write'] += 1
>         else:
>             self.stats['write_error'] += 1
>
>     def read(self):
>         r = ResultEvent()
>         self.tables.inputQ.put(('read',
>                                 (random.randint(0,
> self.tables.num_groups-1),
>                                  random.randint(0,
> self.tables.num_tables-1),
>                                  random.randint(0, 100)),
>                                 r))
>         r.event.wait(timeout = self.timeout)
>         if r.event.is_set():
>             self.stats['read'] += 1
>             #print 'Query got %d hits'%(len(r.value))
>         else:
>             self.stats['read_error'] += 1
>
>
>     def close(self):
>         self.tables.stop()
>
>     def __del__(self):
>         self.close()
>
> class Worker(threading.Thread):
>     def __init__(self, method):
>         threading.Thread.__init__(self)
>         self.method = method
>         self.stopEvt = threading.Event()
>
>     def run(self):
>         while not self.stopEvt.is_set():
>             try:
>                 self.method()
>             except Exception, x:
>                 print 'Worker thread failed with: %s'%(x,)
>             time.sleep(random.random()/100.0)
>
>     def stop(self):
>         self.stopEvt.set()
>
> def main():
>     t = Test()
>
>     threads = [Worker(t.write) for _i in range(10)]
>     threads.extend([Worker(t.read) for _i in range(10)])
>
>     for thread in threads:
>         thread.start()
>
>     time.sleep(5)
>
>     for thread in threads:
>         thread.stop()
>
>     for thread in threads:
>         thread.join()
>
>     t.close()
>
>     print t.stats
>
> if __name__ == "__main__":
>     main()
>
>
> On Wed, Dec 5, 2012 at 10:52 PM, Josh Ayers <josh.ay...@gmail.com> wrote:
>
>> Alan,
>>
>> Unfortunately, the underlying HDF5 library isn't thread-safe by default.
>> It can be built in a thread-safe mode that serializes all API calls, but
>> still doesn't allow actual parallel access to the disk.  See [1] for more
>> details.  Here's [2] another interesting discussion concerning whether
>> multithreaded access is actually beneficial for an I/O limited library like
>> HDF5.  Ultimately, if one thread can read at the disk's maximum transfer
>> rate, then multiple threads don't provide any benefit.
>>
>> Beyond the limitations of HDF5, PyTables also maintains global state in
>> various module-level variables.  One example is the _open_file cache in the
>> file.py module.  I made an attempt in the past to work around this to allow
>> read-only access from multiple threads, but didn't make much progress.
>>
>> In general, I think your best bet is to serialize all access through a
>> single process.  There is another example in the PyTables/examples
>> directory that benchmarks different methods of transferring data from
>> PyTables to another process [3].  It compares Python's
>> multiprocessing.Queue, sockets, and memory-mapped files.  In my testing,
>> the latter two are 5-10x faster than using a queue.
>>
>> Another option would be to use multiple threads, but handle all access to
>> the HDF5 file in one thread.  PyTables will release the GIL when making
>> HDF5 library calls, so the other threads will be able to run.  You could
>> use a Queue.Queue or some other mechanism to transfer data between
>> threads.  No actual copying would be needed since their memory is shared,
>> which should make it faster than the multi-process techniques.
>>
>> Hope that helps.
>>
>> Josh Ayers
>>
>>
>> [1]: http://www.hdfgroup.org/hdf5-quest.html#mthread
>>
>> [2]:
>> https://visitbugs.ornl.gov/projects/8/wiki/Multi-threaded_cores_and_HPC-HDF5
>>
>> [3]:
>> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_benchmarks.py
>>
>>
>> On Wed, Dec 5, 2012 at 2:24 PM, Alan Marchiori <a...@alanmarian.com>wrote:
>>
>>> I am trying to allow multiple threads read/write access to pytables data
>>> and found it is necessary to call flush() before any read.  If not, the
>>> latest data is not returned.  However, this can cause a RuntimeError.  I
>>> have tried protecting pytables access with both locks and queues as done by
>>> joshayers (
>>> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
>>>  In either case I still get RuntimeError: dictionary changed size during
>>> iteration when doing the flush.  (incidentally using the Locks appears to
>>> be much faster than using queues in my unscientific tests...)
>>>
>>> I have tried versions 2.4 and 2.3.1 with the same results.
>>>  Interestingly this only appears to happen if there are multiple
>>> tables/groups in the H5 file.  To investigate this behavior further I
>>> create a test program to illustrate (below).  When run with num_groups =
>>> 5 num_tables = 5 (or greater) I see the runtime error every time.  When
>>> these values are smaller than this it doesn't (at least in a short test
>>> period).
>>>
>>> I might be doing something unexpected with pytables, but this seems
>>> pretty straight forward to me.  Any help is appreciated.
>>>
>>>
>>>
------------------------------------------------------------------------------
LogMeIn Rescue: Anywhere, Anytime Remote support for IT. Free Trial
Remotely access PCs and mobile devices and provide instant support
Improve your efficiency, and focus on delivering more value-add services
Discover what IT Professionals Know. Rescue delivers
http://p.sf.net/sfu/logmein_12329d2d
_______________________________________________
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users

Reply via email to