Re: problem with multiprocessing and defaultdict
Robert Kern wrote: > On 2010-01-11 17:50 PM, wiso wrote: > >> The problem now is this: >> start reading file r1_200909.log >> start reading file r1_200910.log >> readen 488832 lines from file r1_200910.log >> readen 517247 lines from file r1_200909.log >> >> with huge file (the real case) the program freeze. Is there a solution to >> avoid pickling/serialization, ... for example something like this: >> >> if __name__ == "__main__": >> file_names = ["r1_200909.log", "r1_200910.log"] >> pool = multiprocessing.Pool(len(file_names)) >> childrens = [Container(f) for f in file_names] >> pool.map(lambda c: c.read(), childrens) >> >> PicklingError: Can't pickle: attribute lookup >> __builtin__.function failed > > You can't pickle lambda functions. > > What information do you actually need back from the workers? > They sent back the object filled with data. The problem is very simple: I have a container, the container has a method read(file_name) that read a huge file and fill the container with datas. I have more then 1 file to read so I want to parallelize this process. The reading method is quite slow because it involves regex. -- http://mail.python.org/mailman/listinfo/python-list
Re: problem with multiprocessing and defaultdict
Robert Kern wrote: > On 2010-01-11 17:15 PM, wiso wrote: >> I'm using a class to read some data from files: >> >> import multiprocessing >> from collections import defaultdict >> >> def SingleContainer(): >> return list() >> >> >> class Container(defaultdict): >> """ >> this class store odd line in self["odd"] and even line in >> self["even"]. It is stupid, but it's only an example. >> """ >> def __init__(self,file_name): >> if type(file_name) != str: >> raise AttributeError, "%s is not a string" % file_name >> defaultdict.__init__(self,SingleContainer) >> self.file_name = file_name >> self.readen_lines = 0 >> def read(self): >> f = open(self.file_name) >> print "start reading file %s" % self.file_name >> for line in f: >> self.readen_lines += 1 >> values = line.split() >> key = {0: "even", 1: "odd"}[self.readen_lines %2] >> self[key].append(values) >> print "readen %d lines from file %s" % (self.readen_lines, >> self.file_name) >> >> """ >> Now I want to read more than one file per times >> """ >> >> def do(file_name): >> container = Container(file_name) >> container.read() >> return container >> >> if __name__ == "__main__": >> file_names = ["prova_200909.log", "prova_200910.log"] >> pool = multiprocessing.Pool(len(file_names)) >> result = pool.map(do,file_names) >> pool.close() >> pool.join() >> print "Finish" >> >> >> >> but I got: >> start reading file prova_200909.log >> start reading file prova_200910.log >> readen 142 lines from file prova_200909.log >> readen 160 lines from file prova_200910.log >> Exception in thread Thread-2: >> Traceback (most recent call last): >>File "/usr/lib64/python2.6/threading.py", line 522, in >>__bootstrap_inner >> self.run() >>File "/usr/lib64/python2.6/threading.py", line 477, in run >> self.__target(*self.__args, **self.__kwargs) >>File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in >> _handle_results >> task = get() >>File "main2.py", line 11, in __init__ >> raise AttributeError, "%s is not a string" % file_name >> AttributeError: (AttributeError('> 0x7f08b253d938> is not a string',),, >> (,)) >> >> >> the problem is when pool share objects, but why is it calling >> Container.__init__ with a Container parameter? > > When you return the container from do() in the worker process, it must be > pickled in order to be sent over the socket. You did not override the > implementation of the .__reduce_ex__() method, so it used defaultdict's > version which passes the factory function as the first argument to the > constructor. > > I would recommend passing back the container.items() list instead of a > Container instance as the easiest path forward. > Thank you very much, I change from return container to return container.items() and it works: start reading file prova_200909.log readen 142 lines from file prova_200909.log start reading file prova_200910.log readen 160 lines from file prova_200910.log Finish The problem now is this: start reading file r1_200909.log start reading file r1_200910.log readen 488832 lines from file r1_200910.log readen 517247 lines from file r1_200909.log with huge file (the real case) the program freeze. Is there a solution to avoid pickling/serialization, ... for example something like this: if __name__ == "__main__": file_names = ["r1_200909.log", "r1_200910.log"] pool = multiprocessing.Pool(len(file_names)) childrens = [Container(f) for f in file_names] pool.map(lambda c: c.read(), childrens) PicklingError: Can't pickle : attribute lookup __builtin__.function failed -- http://mail.python.org/mailman/listinfo/python-list
problem with multiprocessing and defaultdict
I'm using a class to read some data from files: import multiprocessing from collections import defaultdict def SingleContainer(): return list() class Container(defaultdict): """ this class store odd line in self["odd"] and even line in self["even"]. It is stupid, but it's only an example. """ def __init__(self,file_name): if type(file_name) != str: raise AttributeError, "%s is not a string" % file_name defaultdict.__init__(self,SingleContainer) self.file_name = file_name self.readen_lines = 0 def read(self): f = open(self.file_name) print "start reading file %s" % self.file_name for line in f: self.readen_lines += 1 values = line.split() key = {0: "even", 1: "odd"}[self.readen_lines %2] self[key].append(values) print "readen %d lines from file %s" % (self.readen_lines, self.file_name) """ Now I want to read more than one file per times """ def do(file_name): container = Container(file_name) container.read() return container if __name__ == "__main__": file_names = ["prova_200909.log", "prova_200910.log"] pool = multiprocessing.Pool(len(file_names)) result = pool.map(do,file_names) pool.close() pool.join() print "Finish" but I got: start reading file prova_200909.log start reading file prova_200910.log readen 142 lines from file prova_200909.log readen 160 lines from file prova_200910.log Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 522, in __bootstrap_inner self.run() File "/usr/lib64/python2.6/threading.py", line 477, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in _handle_results task = get() File "main2.py", line 11, in __init__ raise AttributeError, "%s is not a string" % file_name AttributeError: (AttributeError(' is not a string',), , (,)) the problem is when pool share objects, but why is it calling Container.__init__ with a Container parameter? -- http://mail.python.org/mailman/listinfo/python-list
monitor reading file with thread
I'm reading and processing a huge file, so during the execution I want to now the state of the processing: how many lines are already processed, and so on. The first approach is: f = open(filename) n = 0 for l in f: if n % 1000 = 0: print "Reading %d lines" %n do_something(l) but I want something diffent. I want to monitor the running of the computing every n seconds. It's the first time I use threading, and I think mine is not the best solution: import threading import time import Queue class Reader(): def __init__(self,filename): self.filename = filename self.lineno = 0 def __iter__(self): f = open(self.filename) for line in f: self.lineno += 1 time.sleep(0.01) # slow down yield line f.close() class Monitor(threading.Thread): def __init__(self,reader,stop_queue,interval=2): threading.Thread.__init__(self) self.interval = interval self.reader = reader self.stop_queue = stop_queue def run(self): while True: try: if self.stop_queue.get(timeout=self.interval) == "stop": break except Queue.Empty: pass print "MONITOR: ", reader.lineno reader = Reader("r1_200910.log") q = Queue.Queue() monitor = Monitor(reader,q) monitor.start() for line in reader: pass # do_somethinghard(line) q.put("stop") monitor.join() It't works, but I don't like how I'm stopping the thread. In general the Monitor class can be more complex, for example a progress bar. Using python 2.6 -- http://mail.python.org/mailman/listinfo/python-list
Re: Convert month name to month number faster
Antoine Pitrou wrote: > Le Wed, 06 Jan 2010 12:03:36 +0100, wiso a écrit : > > >> from time import time >> t = time(); xxx=map(to_dict,l); print time() - t # 0.5 t = time(); >> xxx=map(to_if,l); print time() - t # 1.0 > > Don't define your own function just for attribute access. Instead just > write: > > xxx = map(month_dict.__getitem__, l) t = time(); xxx=map(month_dict.__getitem__,l); print time() - t # 0.2 month_list = ("","Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec") t = time(); xxx=map(month_list.index,l); time() - t # 0.6 -- http://mail.python.org/mailman/listinfo/python-list
Convert month name to month number faster
I'm optimizing the inner most loop of my script. I need to convert month name to month number. I'm using python 2.6 on linux x64. month_dict = {"Jan":1,"Feb":2,"Mar":3,"Apr":4, "May":5, "Jun":6, "Jul":7,"Aug":8,"Sep":9,"Oct":10,"Nov":11,"Dec":12} def to_dict(name): return month_dict[name] def to_if(name): if name == "Jan": return 1 elif name == "Feb": return 2 elif name == "Mar": return 3 elif name == "Apr": return 4 elif name == "May": return 5 elif name == "Jun": return 6 elif name == "Jul": return 7 elif name == "Aug": return 8 elif name == "Sep": return 9 elif name == "Oct": return 10 elif name == "Nov": return 11 elif name == "Dec": return 12 else: raise ValueError import random l = [random.choice(month_dict.keys()) for _ in range(100)] from time import time t = time(); xxx=map(to_dict,l); print time() - t # 0.5 t = time(); xxx=map(to_if,l); print time() - t # 1.0 is there a faster solution? Maybe something with str.translate? The problem is a little different because I don't read random data, but sorted data. For example: l = [x for x in ("Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec") for _ in range(1000)] # ["Jan","Jan", ..., "Feb", "Feb", ...] so maybe the to_if approach will be faster if I write the case in the best order. Look: l = ["Jan"] * 100 # to_if is in the best order for "Jan" t = time(); xxx=map(to_dict,l); print time() - t # 0.5 t = time(); xxx=map(to_if,l); print time() - t # 0.5 -- http://mail.python.org/mailman/listinfo/python-list
FileInput too slow
I'm trying the fileinput module, and I like it, but I don't understand why it's so slow... look: from time import time from fileinput import FileInput file = ['r1_200907.log', 'r1_200908.log', 'r1_200909.log', 'r1_200910.log', 'r1_200911.log'] def f1(): n = 0 for f in file: print "new file: %s" % f ff = open(f) for line in ff: n += 1 ff.close() return n def f2(): f = FileInput(file) for line in f: if f.isfirstline(): print "new file: %s" % f.filename() return f.lineno() def f3(): # f2 simpler f = FileInput(file) for line in f: pass return f.lineno() t = time(); f1(); print time()-t # 1.0 t = time(); f2(); print time()-t # 7.0 !!! t = time(); f3(); print time()-t # 5.5 I'm using text files, there are 2563150 lines in total. -- http://mail.python.org/mailman/listinfo/python-list
google tech talk code (threading module)
I took a little code from google tech talk. It seems interesting, but it doesn't work: import sys, urllib, os, threading, Queue q = Queue.Queue() class RetrWorker(threading.Thread): def run(self): self.setDaemon(True) def hook(*a): print (fn,a) while True: url = q.get() fn = os.path.basename(url) print url, "->", fn urllib.urlretrive(url, fn, hook) for i in range(10): RetrWorker().start() for url in sys.argv[1:]: q.put(url) Exception in thread Thread-10: Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 522, in __bootstrap_inner self.run() File "wget_m.py", line 7, in run self.setDaemon(True) File "/usr/lib64/python2.6/threading.py", line 690, in setDaemon self.daemon = daemonic File "/usr/lib64/python2.6/threading.py", line 683, in daemon raise RuntimeError("cannot set daemon status of active thread"); RuntimeError: cannot set daemon status of active thread -- http://mail.python.org/mailman/listinfo/python-list