On 21 November 2016 at 11:40, Francesco Chicchiriccò <[email protected]> wrote: > Hi all, > not sure but it seems that the commit below broke my scheduled import from > mbox:
It won't be that commit, most likely the fix for #251 https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43 This presumably means the archiver would have fallen over with the same e-mail. Or there is an encoding problem with writing the mail to the mbox - or reading it - so the importer is not seeing the same input as the archiver. It would be useful to know what the message is that causes the issue. If you can find it I can take a look later. > Exception in thread Thread-1: > Traceback (most recent call last): > File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner > self.run() > File "import-mbox.py", line 297, in run > 'source': message.as_string() > File "/usr/lib/python3.5/email/message.py", line 159, in as_string > g.flatten(self, unixfrom=unixfrom) > File "/usr/lib/python3.5/email/generator.py", line 115, in flatten > self._write(msg) > File "/usr/lib/python3.5/email/generator.py", line 181, in _write > self._dispatch(msg) > File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch > meth(msg) > File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text > msg.set_payload(payload, charset) > File "/usr/lib/python3.5/email/message.py", line 316, in set_payload > payload = payload.encode(charset.output_charset) > UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in position > 3657: ordinal not in range(128) > > Any hint / workaround? > > On 2016-11-21 00:20 (+0100), [email protected] wrote: >> Repository: incubator-ponymail >> Updated Branches: >> refs/heads/master 1a3bff403 -> af1544e7b >> >> >> import-mbox.py messages need the thread number >> >> This fixes #248 >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo >> Commit: >> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7 >> Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7 >> Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7 >> >> Branch: refs/heads/master >> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e >> Parents: 1a3bff4 >> Author: Sebb <[email protected]> >> Authored: Sun Nov 20 23:19:55 2016 +0000 >> Committer: Sebb <[email protected]> >> Committed: Sun Nov 20 23:19:55 2016 +0000 >> >> ---------------------------------------------------------------------- >> tools/import-mbox.py | 59 +++++++++++++++++++++++++++-------------------- >> 1 file changed, 34 insertions(+), 25 deletions(-) >> ---------------------------------------------------------------------- >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py >> ---------------------------------------------------------------------- >> diff --git a/tools/import-mbox.py b/tools/import-mbox.py >> index 15f09ad..12bc0d1 100755 >> --- a/tools/import-mbox.py >> +++ b/tools/import-mbox.py >> @@ -107,7 +107,9 @@ es = Elasticsearch([ >> rootURL = "" >> >> class BulkThread(Thread): >> - def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'): >> + >> + def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'): >> + self.id = id >> self.json = json >> self.xes = xes >> self.dtype = dtype >> @@ -133,17 +135,24 @@ class BulkThread(Thread): >> try: >> helpers.bulk(self.xes, js_arr) >> except Exception as err: >> - print("Warning: Could not bulk insert: %s" % err) >> - #print("Inserted %u entries" % len(js_arr)) >> + print("%d: Warning: Could not bulk insert: %s into %s" % >> (self.id,err,self.dtype)) >> +# print("%d: Inserted %u entries into %s" % (self.id, >> len(js_arr),self.dtype)) >> >> >> class SlurpThread(Thread): >> >> + def __init__(self, index): >> + self.id = index >> + super(SlurpThread, self).__init__() >> + >> + def printid(self,message): >> + print("%d: %s" % (self.id, message)) >> + >> def run(self): >> global block, y, es, lists, baddies, config, resendTo, timeout, >> dedupped, dedup >> ja = [] >> jas = [] >> - print("Thread started") >> + self.printid("Thread started") >> mla = None >> ml = "" >> mboxfile = "" >> @@ -152,16 +161,16 @@ class SlurpThread(Thread): >> archie = archiver.Archiver(parseHTML = parseHTML) >> >> while len(lists) > 0: >> - print("%u elements left to slurp" % len(lists)) >> + self.printid("%u elements left to slurp" % len(lists)) >> >> block.acquire() >> try: >> mla = lists.pop(0) >> if not mla: >> - print("Nothing more to do here") >> + self.printid("Nothing more to do here") >> return >> except Exception as err: >> - print("Could not pop list: %s" % err) >> + self.printid("Could not pop list: %s" % err) >> return >> finally: >> block.release() >> @@ -184,7 +193,7 @@ class SlurpThread(Thread): >> tmpname = mla[0] >> filename = mla[0] >> if filename.find(".gz") != -1: >> - print("Decompressing %s..." % filename) >> + self.printid("Decompressing %s..." % filename) >> try: >> with open(filename, "rb") as bf: >> bmd = bf.read() >> @@ -197,16 +206,16 @@ class SlurpThread(Thread): >> tmpname = tmpfile.name >> filename = tmpname >> dFile = True # Slated for deletion upon having >> been read >> - print("%s -> %u bytes" % (tmpname, len(bmd))) >> + self.printid("%s -> %u bytes" % (tmpname, >> len(bmd))) >> except Exception as err: >> - print("This wasn't a gzip file: %s" % err ) >> - print("Slurping %s" % filename) >> + self.printid("This wasn't a gzip file: %s" % err ) >> + self.printid("Slurping %s" % filename) >> messages = mailbox.mbox(tmpname) >> >> else: >> ml = mla[0] >> mboxfile = mla[1] >> - print("Slurping %s/%s" % (ml, mboxfile)) >> + self.printid("Slurping %s/%s" % (ml, mboxfile)) >> m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile) >> EY = 1997 >> EM = 1 >> @@ -232,7 +241,7 @@ class SlurpThread(Thread): >> if fromFilter and 'from' in message and >> message['from'].find(fromFilter) == -1: >> continue >> if resendTo: >> - print("Delivering message %s via MTA" % >> message['message-id'] if 'message-id' in message else '??') >> + self.printid("Delivering message %s via MTA" % >> message['message-id'] if 'message-id' in message else '??') >> s = SMTP('localhost') >> try: >> if list_override: >> @@ -245,7 +254,7 @@ class SlurpThread(Thread): >> s.send_message(message, from_addr=None, >> to_addrs=(resendTo)) >> continue >> if (time.time() - stime > timeout): # break out after N >> seconds, it shouldn't take this long..! >> - print("Whoa, this is taking way too long, ignoring %s >> for now" % tmpname) >> + self.printid("Whoa, this is taking way too long, >> ignoring %s for now" % tmpname) >> break >> >> json, contents = archie.compute_updates(list_override, >> private, message) >> @@ -271,7 +280,7 @@ class SlurpThread(Thread): >> } >> ) >> if res and len(res['hits']['hits']) > 0: >> - print("Dedupping %s" % json['message-id']) >> + self.printid("Dedupping %s" % json['message-id']) >> dedupped += 1 >> continue >> >> @@ -305,43 +314,43 @@ class SlurpThread(Thread): >> if len(ja) >= 40: >> if not args.dry: >> bulk = BulkThread() >> - bulk.assign(ja, es, 'mbox') >> + bulk.assign(self.id, ja, es, 'mbox') >> bulk.insert() >> ja = [] >> >> if not args.dry: >> bulks = BulkThread() >> - bulks.assign(jas, es, 'mbox_source') >> + bulks.assign(self.id, jas, es, 'mbox_source') >> bulks.insert() >> jas = [] >> else: >> - print("Failed to parse: Return=%s Message-Id=%s" % >> (message.get('Return-Path'), message.get('Message-Id'))) >> + self.printid("Failed to parse: Return=%s Message-Id=%s" >> % (message.get('Return-Path'), message.get('Message-Id'))) >> bad += 1 >> >> if filebased: >> - print("Parsed %u records (failed: %u) from %s" % (count, >> bad, filename)) >> + self.printid("Parsed %u records (failed: %u) from %s" % >> (count, bad, filename)) >> if dFile: >> os.unlink(tmpname) >> elif imap: >> - print("Parsed %u records (failed: %u) from imap" % (count, >> bad)) >> + self.printid("Parsed %u records (failed: %u) from imap" % >> (count, bad)) >> else: >> - print("Parsed %s/%s: %u records (failed: %u) from %s" % >> (ml, mboxfile, count, bad, tmpname)) >> + self.printid("Parsed %s/%s: %u records (failed: %u) from >> %s" % (ml, mboxfile, count, bad, tmpname)) >> os.unlink(tmpname) >> >> y += count >> baddies += bad >> if not args.dry: >> bulk = BulkThread() >> - bulk.assign(ja, es, 'mbox') >> + bulk.assign(self.id, ja, es, 'mbox') >> bulk.insert() >> ja = [] >> >> if not args.dry: >> bulks = BulkThread() >> - bulks.assign(jas, es, 'mbox_source') >> + bulks.assign(self.id, jas, es, 'mbox_source') >> bulks.insert() >> jas = [] >> - print("Done, %u elements left to slurp" % len(lists)) >> + self.printid("Done, %u elements left to slurp" % len(lists)) >> >> parser = argparse.ArgumentParser(description='Command line options.') >> parser.add_argument('--source', dest='source', type=str, nargs=1, >> @@ -637,7 +646,7 @@ threads = [] >> cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1) >> print("Starting up to %u threads to fetch the %u %s lists" % (cc, >> len(lists), project)) >> for i in range(1,cc+1): >> - t = SlurpThread() >> + t = SlurpThread(i) >> threads.append(t) >> t.start() >> print("Started no. %u" % i) >> >>
