Hello. DISCLAIMER: This is NOT about SQL Alchemy per se. I just seek opinions and ideas from people that are familiar with relational databases, especially postgres. Sorry for the longish post...
Someone posted a link to the following presentation a few days back: https://speakerdeck.com/rwarren/a-brief-intro-to-profiling-in-python It inspired me to try pg's COPY FROM command instead of a compiled INSERT command(s) to copy large amount of data from a remote MS SQL database via pyodbc to the local postgres instance (via psycopg2). Accordining to the presentation, this should be approx. 2-3 times faster than my current solution. ALGORITHM: * FILL (populate local postgres tables from the remote MS SQL databse): * Drop the tables. * Create them a new. * For each such table: * Query MS SQL db and insert result rows to the table. * COMMIT. * UPDATE (application downtime period): * Now, all data from the remote MS SQL are present in local postgres tables. * Execute various insert/update scripts that put local data in sync. * COMMIT. NOte, that the pg_dump of the entire postgres DB in SQL format has about 1GB, 500MB for the "temporary" tables with copies of remote data and 500MB for the regular tables. This algorithm works for several weeks now on a daily basis with only minor issues mostly related to network outages. I replaced compiled INSERTs with COPY FROM command. I use os.pipe. One end reads MS SQL query rows, converts them to CSV and writes them to output. The other end is a COPY FROM command that processes the CSV lines. It runs in a separate thread (threading.Thread). The interesting portion of the code is this: # Snippet of... class ExtDbTable: """Logic to populate one local postgres table with data from a remote MS SQL database.""" def fill(self, cursor): """Perform a SQL query to the external DB and populate the table with its results. cursor argument represents (pyodbc) connection to the external DB. The table is guaranteed to be empty. Perform the query to the external DB and insert its results via postgres COPY FROM command as one huge (but streamlined) CSV. """ t0 = datetime.utcnow() gen = self._fetch(cursor) self._insert(gen) t1 = datetime.utcnow() if self._echo: info(u"{}.fill\ntotal: {}\n", self.name, t1 - t0) def _fetch(self, cursor): """Generator that fetches one result row at a time from a remote MS SQL database.""" sql = self._query if self._echo: info(u"\n\n#### SQL ####\nTable: {}\n{}", self.name, sql) t0 = datetime.utcnow() cursor.execute(sql) t1 = datetime.utcnow() if self._echo: info(u"{}._fetch\ntotal: {}", self.name, t1 - t0) each = cursor.fetchone() while each: yield each each = cursor.fetchone() # TODO: Remove the obsolete code. # yield dict(zip([each_desc[0] for each_desc in each.cursor_description], each)) def _insert(self, gen): """Insert rows to the ext table via postgres COPY FROM command: * Open pipe. * Start new thread that reads CSV lines from the pipe and inserts them into the table via COPY FROM command. The thread is necessary because cursor.copy_expert() (=COPY FROM command) reads till EOF. This would block because the pipe has no EOF (yet). * Read SQL result rows from the input generator (one at a time), convert each to CSV format and write it to the pipe. NOTE: This should be approx. 2-3x faster than the previous solution that used large INSERT statement(s) escaped with cursor.mogrify(). Maybe even more, because the current implementation SHOULD be using much less memory. NOTE: The real measurement shows that the new implementation takes 70% of the old (1h50m vs. 1h15m). HOWEVER, the following update consumes ALL AVAILABLE MEMORY AND DOES NOT FINISH. """ t0 = datetime.utcnow() rd, wd = os.pipe() # d-descriptor rf, wf = os.fdopen(rd), os.fdopen(wd, 'w') # f-file # NOTE: Session = scoped_session(sessionmaker(bind=engine, autoflush=False)) cursor = Session().connection().connection.cursor() null = '_?_' def _convert_value(v): if v is None: return null assert v != null if type(v) is str: return unicode(each).encode('utf-8') if type(v) is unicode: return each.encode('utf-8') return str(v) # Consumer (COPY FROM command). def _consumer(): sql = u"COPY {} FROM STDIN WITH (FORMAT csv, ENCODING 'utf-8', NULL '{}')".format(self.name, null) # NOTE: Default buffer size is 8192 bytes. The value 16*8192 was a # bit faster than the default, 30s improvement on a 10m test run. # Further increment of the buffer had no impact. cursor.copy_expert(sql, rf, size=131072) consumer = threading.Thread(target=_consumer) consumer.start() # Producer (CSV writer). writer = csv.writer(wf, lineterminator='\n') for sql_line in gen: csv_line = [_convert_value(each) for each in sql_line] writer.writerow(csv_line) wf.close() # Wait for the consumer to finish. consumer.join() rf.close() cursor.close() t1 = datetime.utcnow() if self._echo: info(u"{}._insert\ntotal: {}\n", self.name, t1 - t0) The FILL runs a bit faster (1h15m vs. 1h50m) and uses much less memory than before. Thus the FILL portion has improved. HOWEVER, the initial UPDATE (on an empty postgres database) does NOT complete AT ALL. The postgres process eventually consumes all available memory and all swap space and thus brings everything to a halt. The ONLY change I am aware of is from INSERT to COPY FROM. UPDATE-related code and scripts haven't changed AT ALL. The data in the remote MS SQL are the same (I have access to a fixed snapshot). I am not aware of any resource leak during the FILL. When I monitor the memory of the postgres process, it is constantly 164076 (VMSize). It starts to grow during the UPDATE. I have absolutely no idea about possible cause of this. Does any of you know how does COPY FROM command work? The postgres documentation says that when it fails, the written data occupy space but are not reachable and one should run vacuum to get rid of them. Does this suggests that its internals are different than those of a regular INSERT? Do any of you guys have any idea about the cause of this? Have you seen something similar when you used COPY FROM yourself? Thank you for reading, Ladislav Lenart -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy?hl=en. For more options, visit https://groups.google.com/groups/opt_out.