Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration

2012-12-10 Thread Alan Marchiori
I'm continuing to fight this error.  As a sanity check I rewrote my sample
app as a single thread only.  With interleaved read/writes to multiple
tables I still get RuntimeError: dictionary changed size during iteration
in flush.  I still think there is some underlying problem or something I
don't understand about pytables/hdf5.  I'm far from an expert on either of
these so I appreciate any suggestions or even confirmation that I'm not
completely crazy?  The following code should work, right?

import tables
import random
import datetime

# a simple table
class TableValue(tables.IsDescription):
a = tables.Int64Col(pos=1)
b = tables.UInt32Col(pos=2)

class Test():
def __init__(self):
self.stats = {'read': 0,
  'write': 0,
  'read_error': 0,
  'write_error': 0}
self.h5 = None
self.h5 = tables.openFile('/data/test.h5', mode='w')
self.num_groups = 5
self.num_tables = 5
# create num_groups
self.groups = [self.h5.createGroup('/', group%d%i) for i in
range(self.num_groups)]
self.tables = []
# create num_tables in each group we just created
for group in self.groups:
tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
self.tables.append (tbls)
for table in tbls:
# add an index for good measure
table.cols.a.createIndex()

def write(self):
# select a random table and write to it
x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
x['a'] = random.randint(0, 100)
x['b'] = random.randint(0, 100)
x.append()
self.stats['write'] += 1

def read(self):
# first flush any cached data
self.h5.flush()
# then select a random table
table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
# and do some random query
table.readWhere('a  %d'%(random.randint(0, 100)))
self.stats['read'] += 1

def close(self):
self.h5.close()

def main():
t = Test()

start = datetime.datetime.now()

# run for 10 seconds
while (datetime.datetime.now() - start 
datetime.timedelta(seconds=10)):
# randomly do a read or a write
if random.random()  0.5:
t.write()
else:
t.read()

print t.stats
print Done
t.close()

if __name__ == __main__:
main()


On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori a...@alanmarian.com wrote:

 Josh,

 Thanks for the detailed response.  I would like to avoid going through a
 separate process if at all possible due to the performance penalty.  I have
 also tried your last suggestion to create a dedicated pytables thread and
 send everything through that but still see the same problem (Runtime error
 in flush).  This leads me to believe something strange is going on behind
 the scenes.  ??

 Updated test program with dedicated pytables thread reading an input
 Queue.Queue:

 import tables
 import threading
 import random
 import time
 import Queue

 # a simple table
 class TableValue(tables.IsDescription):
 a = tables.Int64Col(pos=1)
 b = tables.UInt32Col(pos=2)

 class TablesThread(threading.Thread):
 def __init__(self):
 threading.Thread.__init__(self)
 self.name = 'HDF5 io thread'
 # create the dummy HDF5 file
 self.h5 = None
 self.h5 = tables.openFile('/data/test.h5', mode='w')
 self.num_groups = 5
 self.num_tables = 5
 self.groups = [self.h5.createGroup('/', group%d%i) for i in
 range(self.num_groups)]
 self.tables = []
 for group in self.groups:
 tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
 for i in range(self.num_tables)]
 self.tables.append (tbls)
 for table in tbls:
 # add an index for good measure
 table.cols.a.createIndex()
 self.stopEvt = threading.Event()
 self.stoppedEvt = threading.Event()
 self.inputQ = Queue.Queue()

 def run(self):
 try:
 while not self.stopEvt.is_set():
 # get a command
 try:
 cmd, args, result = self.inputQ.get(timeout = 0.5)
 except Queue.Empty:
 # poll stopEvt so we can shutdown
 continue

 # do the command
 if cmd == 'write':
 x = self.tables[args[0]][args[1]].row
 x['a'] = args[2]
 x['b'] = args[3]
 x.append()
 elif cmd == 'read':
 self.h5.flush()
 table = self.tables[args[0]][args[1]]
 result.value = table.readWhere('a  %d'%(args[2]))
   

Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration

2012-12-10 Thread Alan Marchiori
I think I have found a viable work around.
Previously, I was flushing the whole HDF5 file:
self.h5.flush()

By replacing this with a flush on just the table of interest:
table = self.tables[random.randint(0, self.num_groups-1)][random.randint(0,
self.num_tables-1)]
table.flush()

The RuntimeError seems to be gone in all versions of my test program (both
single threaded and threaded with locks).  Hope this helps someone else and
eventually maybe someone will figure out what is wrong with File.flush().


On Mon, Dec 10, 2012 at 10:52 AM, Alan Marchiori a...@alanmarian.comwrote:

 I'm continuing to fight this error.  As a sanity check I rewrote my sample
 app as a single thread only.  With interleaved read/writes to multiple
 tables I still get RuntimeError: dictionary changed size during iteration
 in flush.  I still think there is some underlying problem or something I
 don't understand about pytables/hdf5.  I'm far from an expert on either of
 these so I appreciate any suggestions or even confirmation that I'm not
 completely crazy?  The following code should work, right?

 import tables
 import random
 import datetime

 # a simple table
 class TableValue(tables.IsDescription):
 a = tables.Int64Col(pos=1)
 b = tables.UInt32Col(pos=2)

 class Test():
 def __init__(self):
 self.stats = {'read': 0,
   'write': 0,
   'read_error': 0,
   'write_error': 0}
 self.h5 = None
 self.h5 = tables.openFile('/data/test.h5', mode='w')
 self.num_groups = 5
 self.num_tables = 5
 # create num_groups
 self.groups = [self.h5.createGroup('/', group%d%i) for i in
 range(self.num_groups)]
 self.tables = []
 # create num_tables in each group we just created
 for group in self.groups:
 tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
 for i in range(self.num_tables)]
 self.tables.append (tbls)
 for table in tbls:
 # add an index for good measure
 table.cols.a.createIndex()

 def write(self):
 # select a random table and write to it
 x = self.tables[random.randint(0,
 self.num_groups-1)][random.randint(0, self.num_tables-1)].row
 x['a'] = random.randint(0, 100)
 x['b'] = random.randint(0, 100)
 x.append()
 self.stats['write'] += 1

 def read(self):
 # first flush any cached data
 self.h5.flush()
 # then select a random table
 table = self.tables[random.randint(0,
 self.num_groups-1)][random.randint(0, self.num_tables-1)]
 # and do some random query
 table.readWhere('a  %d'%(random.randint(0, 100)))
 self.stats['read'] += 1

 def close(self):
 self.h5.close()

 def main():
 t = Test()

 start = datetime.datetime.now()

 # run for 10 seconds
 while (datetime.datetime.now() - start 
 datetime.timedelta(seconds=10)):
 # randomly do a read or a write
 if random.random()  0.5:
 t.write()
 else:
 t.read()

 print t.stats
 print Done
 t.close()

 if __name__ == __main__:
 main()


 On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori a...@alanmarian.comwrote:

 Josh,

 Thanks for the detailed response.  I would like to avoid going through a
 separate process if at all possible due to the performance penalty.  I have
 also tried your last suggestion to create a dedicated pytables thread and
 send everything through that but still see the same problem (Runtime error
 in flush).  This leads me to believe something strange is going on behind
 the scenes.  ??

 Updated test program with dedicated pytables thread reading an input
 Queue.Queue:

 import tables
 import threading
 import random
 import time
 import Queue

 # a simple table
 class TableValue(tables.IsDescription):
 a = tables.Int64Col(pos=1)
 b = tables.UInt32Col(pos=2)

 class TablesThread(threading.Thread):
 def __init__(self):
 threading.Thread.__init__(self)
 self.name = 'HDF5 io thread'
 # create the dummy HDF5 file
 self.h5 = None
 self.h5 = tables.openFile('/data/test.h5', mode='w')
 self.num_groups = 5
 self.num_tables = 5
 self.groups = [self.h5.createGroup('/', group%d%i) for i in
 range(self.num_groups)]
 self.tables = []
 for group in self.groups:
 tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
 for i in range(self.num_tables)]
 self.tables.append (tbls)
 for table in tbls:
 # add an index for good measure
 table.cols.a.createIndex()
 self.stopEvt = threading.Event()
 self.stoppedEvt = threading.Event()
 self.inputQ = Queue.Queue()

 def run(self):
 try:
 while not self.stopEvt.is_set():
 # get a command
  

[Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration

2012-12-05 Thread Alan Marchiori
I am trying to allow multiple threads read/write access to pytables data
and found it is necessary to call flush() before any read.  If not, the
latest data is not returned.  However, this can cause a RuntimeError.  I
have tried protecting pytables access with both locks and queues as done by
joshayers (
https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
 In either case I still get RuntimeError: dictionary changed size during
iteration when doing the flush.  (incidentally using the Locks appears to
be much faster than using queues in my unscientific tests...)

I have tried versions 2.4 and 2.3.1 with the same results.  Interestingly
this only appears to happen if there are multiple tables/groups in the H5
file.  To investigate this behavior further I create a test program to
illustrate (below).  When run with num_groups = 5 num_tables = 5 (or
greater) I see the runtime error every time.  When these values are smaller
than this it doesn't (at least in a short test period).

I might be doing something unexpected with pytables, but this seems pretty
straight forward to me.  Any help is appreciated.


import tables
import threading
import random
import time

lock = threading.Lock()
def synchronized(lock):
def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap

class TableValue(tables.IsDescription):
a = tables.Int64Col(pos=1)
b = tables.UInt32Col(pos=2)

class Test():
def __init__(self):
self.h5 = None
self.h5 = tables.openFile('/data/test.h5', mode='w')
self.num_groups = 5
self.num_tables = 5

self.groups = [self.h5.createGroup('/', group%d%i) for i in
range(self.num_groups)]
self.tables = []
for group in self.groups:
tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
self.tables.append (tbls)
for table in tbls:
table.cols.a.createIndex()

self.stats = {'read': 0,
  'write': 0}

@synchronized(lock)
def __del__(self):
if self.h5 != None:
self.h5.close()
self.h5 = None

@synchronized(lock)
def write(self):
x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
x['a'] = random.randint(0, 100)
x['b'] = random.randint(0, 100)
x.append()
self.stats['write'] += 1

@synchronized(lock)
def read(self):
# flush so we can query latest data
self.h5.flush()

table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
# do some query
results = table.readWhere('a  %d'%(random.randint(0, 100)))
#print 'Query got %d hits'%(len(results))

self.stats['read'] += 1

class Worker(threading.Thread):
def __init__(self, method):
threading.Thread.__init__(self)
self.method = method
self.stopEvt = threading.Event()

def run(self):
while not self.stopEvt.is_set():
self.method()
time.sleep(random.random()/100.0)

def stop(self):
self.stopEvt.set()

def main():
t = Test()

threads = [Worker(t.write) for _i in range(10)]
threads.extend([Worker(t.read) for _i in range(10)])

for thread in threads:
thread.start()

time.sleep(5)

for thread in threads:
thread.stop()

for thread in threads:
thread.join()

print t.stats



if __name__ == __main__:
main()
--
LogMeIn Rescue: Anywhere, Anytime Remote support for IT. Free Trial
Remotely access PCs and mobile devices and provide instant support
Improve your efficiency, and focus on delivering more value-add services
Discover what IT Professionals Know. Rescue delivers
http://p.sf.net/sfu/logmein_12329d2d___
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users


Re: [Pytables-users] Problems with flush(): RuntimeError: dictionary changed size during iteration

2012-12-05 Thread Josh Ayers
Alan,

Unfortunately, the underlying HDF5 library isn't thread-safe by default.
It can be built in a thread-safe mode that serializes all API calls, but
still doesn't allow actual parallel access to the disk.  See [1] for more
details.  Here's [2] another interesting discussion concerning whether
multithreaded access is actually beneficial for an I/O limited library like
HDF5.  Ultimately, if one thread can read at the disk's maximum transfer
rate, then multiple threads don't provide any benefit.

Beyond the limitations of HDF5, PyTables also maintains global state in
various module-level variables.  One example is the _open_file cache in the
file.py module.  I made an attempt in the past to work around this to allow
read-only access from multiple threads, but didn't make much progress.

In general, I think your best bet is to serialize all access through a
single process.  There is another example in the PyTables/examples
directory that benchmarks different methods of transferring data from
PyTables to another process [3].  It compares Python's
multiprocessing.Queue, sockets, and memory-mapped files.  In my testing,
the latter two are 5-10x faster than using a queue.

Another option would be to use multiple threads, but handle all access to
the HDF5 file in one thread.  PyTables will release the GIL when making
HDF5 library calls, so the other threads will be able to run.  You could
use a Queue.Queue or some other mechanism to transfer data between
threads.  No actual copying would be needed since their memory is shared,
which should make it faster than the multi-process techniques.

Hope that helps.

Josh Ayers


[1]: http://www.hdfgroup.org/hdf5-quest.html#mthread

[2]:
https://visitbugs.ornl.gov/projects/8/wiki/Multi-threaded_cores_and_HPC-HDF5

[3]:
https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_benchmarks.py


On Wed, Dec 5, 2012 at 2:24 PM, Alan Marchiori a...@alanmarian.com wrote:

 I am trying to allow multiple threads read/write access to pytables data
 and found it is necessary to call flush() before any read.  If not, the
 latest data is not returned.  However, this can cause a RuntimeError.  I
 have tried protecting pytables access with both locks and queues as done by
 joshayers (
 https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
  In either case I still get RuntimeError: dictionary changed size during
 iteration when doing the flush.  (incidentally using the Locks appears to
 be much faster than using queues in my unscientific tests...)

 I have tried versions 2.4 and 2.3.1 with the same results.  Interestingly
 this only appears to happen if there are multiple tables/groups in the H5
 file.  To investigate this behavior further I create a test program to
 illustrate (below).  When run with num_groups = 5 num_tables = 5 (or
 greater) I see the runtime error every time.  When these values are smaller
 than this it doesn't (at least in a short test period).

 I might be doing something unexpected with pytables, but this seems pretty
 straight forward to me.  Any help is appreciated.


 import tables
 import threading
 import random
 import time

 lock = threading.Lock()
 def synchronized(lock):
 def wrap(f):
 def newFunction(*args, **kw):
 lock.acquire()
 try:
 return f(*args, **kw)
 finally:
 lock.release()
 return newFunction
 return wrap

 class TableValue(tables.IsDescription):
 a = tables.Int64Col(pos=1)
 b = tables.UInt32Col(pos=2)

 class Test():
 def __init__(self):
 self.h5 = None
 self.h5 = tables.openFile('/data/test.h5', mode='w')
 self.num_groups = 5
 self.num_tables = 5

 self.groups = [self.h5.createGroup('/', group%d%i) for i in
 range(self.num_groups)]
 self.tables = []
 for group in self.groups:
 tbls = [self.h5.createTable(group, 'table%d'%i, TableValue)
 for i in range(self.num_tables)]
 self.tables.append (tbls)
 for table in tbls:
 table.cols.a.createIndex()

 self.stats = {'read': 0,
   'write': 0}

 @synchronized(lock)
 def __del__(self):
 if self.h5 != None:
 self.h5.close()
 self.h5 = None

 @synchronized(lock)
 def write(self):
 x = self.tables[random.randint(0,
 self.num_groups-1)][random.randint(0, self.num_tables-1)].row
 x['a'] = random.randint(0, 100)
 x['b'] = random.randint(0, 100)
 x.append()
 self.stats['write'] += 1

 @synchronized(lock)
 def read(self):
 # flush so we can query latest data
 self.h5.flush()

 table = self.tables[random.randint(0,
 self.num_groups-1)][random.randint(0, self.num_tables-1)]
 # do some query
 results = table.readWhere('a  %d'%(random.randint(0, 100)))
 #print 'Query got %d