En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magaw...@gmail.com> escribió:

I am very new to python and I am in the process of loading a very
large compressed csv file into another format.  I was wondering if I
can do this in a multi thread approach.

Does the format conversion involve a significant processing time? If not, the total time is dominated by the I/O time (reading and writing the file) so it's doubtful you gain anything from multiple threads.

Here is the pseudo code I was thinking about:

Let T = Total number of lines in a file, Example 1000000 (1 million files)
Let B = Total number of lines in a buffer, for example 10000 lines


Create a thread to read until buffer
Create another thread to read buffer+buffer  ( So we have 2 threads
now. But since the file is zipped I have to wait until the first
thread is completed. Unless someone knows of a clever technique.
Write the content of thread 1 into a numpy array
Write the content of thread 2 into a numpy array

Can you process each line independently? Is the record order important? If not (or at least, some local dis-ordering is acceptable) you may use a few worker threads (doing the conversion), feed them thru a Queue object, put the converted lines into another Queue, and let another thread write the results onto the destination file.

import Queue, threading, csv

def convert(in_queue, out_queue):
  while True:
    row = in_queue.get()
    if row is None: break
    # ... convert row
    out_queue.put(converted_line)

def write_output(out_queue):
  while True:
    line = out_queue.get()
    if line is None: break
    # ... write line to output file

in_queue = Queue.Queue()
out_queue = Queue.Queue()
tlist = []
for i in range(4):
  t = threading.Thread(target=convert, args=(in_queue, out_queue))
  t.start()
  tlist.append(t)
output_thread = threading.Thread(target=write_output, args=(out_queue,))
output_thread.start()

with open("...") as csvfile:
  reader = csv.reader(csvfile, ...)
  for row in reader:
    in_queue.put(row)

for t in tlist: in_queue.put(None) # indicate end-of-work
for t in tlist: t.join() # wait until finished
out_queue.put(None)
output_thread.join() # wait until finished

--
Gabriel Genellina

--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to