Hi all, 

I could really use some help with a problem I'm having.
I wrote a function that can take a pattern of actions and it apply it to the 
filesystem.
It takes a list of starting paths, and a pattern like this:

pattern = {
            InGlob('Test/**'):{
                MatchRemove('DS_Store'):[],
                NoMatchAdd('(alhpaID_)|(DS_Store)','warnings'):[],
                MatchAdd('alphaID_','alpha_found'):[],
                InDir('alphaID_'):{
                    NoMatchAdd('(betaID_)|(DS_Store)','warnings'):[],
                    InDir('betaID_'):{
                        NoMatchAdd('(gammaID_)|(DS_Store)','warnings'):[],
                        MatchAdd('gammaID_','gamma_found'):[] }}}}

so if you run evalFSPattern(['Volumes/**'],pattern) it'll return a dictionary 
where:

dict['gamma_found'] = [list of paths that matched] (i.e. 
'/Volumes/HD1/Test/alphaID_3382/betaID_38824/gammaID_848384')
dict['warning'] = [list of paths that failed to match] (ie. 
'/Volumes/HD1/Test/alphaID_3382/gammaID_47383') 

Since some of these volumes are on network shares I also wanted to parallelize 
this so that it would not block on IO.  I started the parallelization by using 
multiprocessing.Pool and got it to work if I ran the fsparser from the 
interpreter.  It ran in *much* less time and produced correct output that 
matched the non-parallelized version.  The problem begins if I then try to use 
the parallelized function from within the code.

For example I wrote a class whose instances are created around valid FS paths, 
that are cached to reduce expensive FS lookups.

class Experiment(object):
        
        SlidePaths = None

        @classmethod
        def getSlidePaths(cls):
              if cls.SlidePaths == None:
                   cls.SlidePaths = fsparser(['/Volumes/**'],pattern)
              return cls.SlidePaths
        
        @classmethod
        def lookupPathWithGammaID(cls,id):
                paths = cls.getSlidePaths()
                ...
                return paths[selected]

        @classmethod
        def fromGamaID(cls,id):
                path = cls.lookupPathWithGammaID(id)
                return cls(path)
        
        def __init__(self,path)
                self.Path = path
                ...
        
        ...     

If I do the following from the interpreter it works:

>>>from experiment import Experiment
>>>expt = Experiment.fromGammaID(10102)

but if I write a script called test.py:

from experiment import Experiment
expt1 = Experiment.fromGammaID(10102)
expt2 = Experiment.fromGammaID(10103)
comparison = expt1.compareTo(expt2)

it fails, if I try to import it or run it from bash prompt:

>>> from test import comparison (hangs forever)
$ python test.py (hangs forever)

I would really like some help trying to figure this out...  I thought it should 
work easily since all the spawned processes don't share data or state (their 
results are merged in the main thread).  The classes used in the pattern are 
also simple python objects (use python primitives).


These are the main functions:

def mapAction(pool,paths,action):
    merge = {'next':[]}
    for result in pool.map(action,paths):
        if result == None:
            continue
        merge = mergeDicts(merge,result)
    return merge


def mergeDicts(d1,d2):
    for key in d2:
        if key not in d1:
            d1[key] = d2[key]
        else:
            d1[key] += d2[key]
    return d1


def evalFSPattern(paths,pattern):
    pool = Pool(10)
    results = {}
    for action in pattern:
        tomerge1 = mapAction(pool,paths,action)
        tomerge2 = evalFSPattern(tomerge1['next'],pattern[action])
        del tomerge1['next']
        results = mergeDicts(results,tomerge1)
        results = mergeDicts(results,tomerge2)
    return results

the classes used in the pattern (InGlob,NoMatchAdd,etc.) are callable classes 
that take a single parameter (a path) and return a dict result or None which 
makes them trivial to adapt to Pool.map.

Note if I change the mapAction function to:

def mapAction(pool,paths,action):
    merge = {'next':[]}
    for path in paths:
         result = action(path)
         if result == None:
            continue
        merge = mergeDicts(merge,result)
    return merge

everything works just fine.


Thanks.


-- 
http://mail.python.org/mailman/listinfo/python-list

Reply via email to