Hello. Please ignore my previous post. The cause of the UPDATE problem is still unknown to me but it is NOT related to COPY FROM command in any way. I was just told that we are unable to perform the initial UPDATE (i.e. from an empty postgres DB to a full one) for two weeks now. The UPDATE code haven't changed at all. We only added a couple of indexes. Might these cause such a catastrophic behaviour?
Ladislav Lenart On 14.3.2013 12:04, Ladislav Lenart wrote: > Hello. > > DISCLAIMER: This is NOT about SQL Alchemy per se. I just seek opinions and > ideas > from people that are familiar with relational databases, especially postgres. > Sorry for the longish post... > > Someone posted a link to the following presentation a few days back: > > https://speakerdeck.com/rwarren/a-brief-intro-to-profiling-in-python > > It inspired me to try pg's COPY FROM command instead of a compiled INSERT > command(s) to copy large amount of data from a remote MS SQL database via > pyodbc > to the local postgres instance (via psycopg2). Accordining to the > presentation, > this should be approx. 2-3 times faster than my current solution. > > ALGORITHM: > * FILL (populate local postgres tables from the remote MS SQL databse): > * Drop the tables. > * Create them a new. > * For each such table: > * Query MS SQL db and insert result rows to the table. > * COMMIT. > * UPDATE (application downtime period): > * Now, all data from the remote MS SQL are present in local postgres tables. > * Execute various insert/update scripts that put local data in sync. > * COMMIT. > > NOte, that the pg_dump of the entire postgres DB in SQL format has about 1GB, > 500MB for the "temporary" tables with copies of remote data and 500MB for the > regular tables. > > This algorithm works for several weeks now on a daily basis with only minor > issues mostly related to network outages. > > I replaced compiled INSERTs with COPY FROM command. I use os.pipe. One end > reads > MS SQL query rows, converts them to CSV and writes them to output. The other > end > is a COPY FROM command that processes the CSV lines. It runs in a separate > thread (threading.Thread). The interesting portion of the code is this: > > > # Snippet of... > class ExtDbTable: > """Logic to populate one local postgres table with data from a remote MS > SQL > database.""" > > def fill(self, cursor): > """Perform a SQL query to the external DB and populate the table with > its results. > cursor argument represents (pyodbc) connection to the external DB. > The table is guaranteed to be empty. > Perform the query to the external DB and insert its results via > postgres COPY FROM command as one huge (but streamlined) CSV. > """ > t0 = datetime.utcnow() > gen = self._fetch(cursor) > self._insert(gen) > t1 = datetime.utcnow() > if self._echo: > info(u"{}.fill\ntotal: {}\n", self.name, t1 - t0) > > def _fetch(self, cursor): > """Generator that fetches one result row at a time from a remote MS > SQL > database.""" > sql = self._query > if self._echo: > info(u"\n\n#### SQL ####\nTable: {}\n{}", self.name, sql) > t0 = datetime.utcnow() > cursor.execute(sql) > t1 = datetime.utcnow() > if self._echo: > info(u"{}._fetch\ntotal: {}", self.name, t1 - t0) > each = cursor.fetchone() > while each: > yield each > each = cursor.fetchone() > # TODO: Remove the obsolete code. > # yield dict(zip([each_desc[0] for each_desc in > each.cursor_description], each)) > > def _insert(self, gen): > """Insert rows to the ext table via postgres COPY FROM command: > * Open pipe. > * Start new thread that reads CSV lines from the pipe and inserts them > into the table via COPY FROM command. The thread is necessary because > cursor.copy_expert() (=COPY FROM command) reads till EOF. This would > block because the pipe has no EOF (yet). > * Read SQL result rows from the input generator (one at a time), > convert each to CSV format and write it to the pipe. > > NOTE: This should be approx. 2-3x faster than the previous solution > that used large INSERT statement(s) escaped with cursor.mogrify(). > Maybe even more, because the current implementation SHOULD be using > much less memory. > > NOTE: The real measurement shows that the new implementation takes > 70% of the old (1h50m vs. 1h15m). HOWEVER, the following update > consumes ALL AVAILABLE MEMORY AND DOES NOT FINISH. > """ > t0 = datetime.utcnow() > rd, wd = os.pipe() # d-descriptor > rf, wf = os.fdopen(rd), os.fdopen(wd, 'w') # f-file > # NOTE: Session = scoped_session(sessionmaker(bind=engine, > autoflush=False)) > cursor = Session().connection().connection.cursor() > null = '_?_' > > def _convert_value(v): > if v is None: > return null > assert v != null > if type(v) is str: > return unicode(each).encode('utf-8') > if type(v) is unicode: > return each.encode('utf-8') > return str(v) > > # Consumer (COPY FROM command). > def _consumer(): > sql = u"COPY {} FROM STDIN WITH (FORMAT csv, ENCODING 'utf-8', > NULL > '{}')".format(self.name, null) > # NOTE: Default buffer size is 8192 bytes. The value 16*8192 was a > # bit faster than the default, 30s improvement on a 10m test run. > # Further increment of the buffer had no impact. > cursor.copy_expert(sql, rf, size=131072) > consumer = threading.Thread(target=_consumer) > consumer.start() > > # Producer (CSV writer). > writer = csv.writer(wf, lineterminator='\n') > for sql_line in gen: > csv_line = [_convert_value(each) for each in sql_line] > writer.writerow(csv_line) > wf.close() > > # Wait for the consumer to finish. > consumer.join() > rf.close() > cursor.close() > t1 = datetime.utcnow() > if self._echo: > info(u"{}._insert\ntotal: {}\n", self.name, t1 - t0) > > > The FILL runs a bit faster (1h15m vs. 1h50m) and uses much less memory than > before. Thus the FILL portion has improved. > > HOWEVER, the initial UPDATE (on an empty postgres database) does NOT complete > AT > ALL. The postgres process eventually consumes all available memory and all > swap > space and thus brings everything to a halt. > > The ONLY change I am aware of is from INSERT to COPY FROM. UPDATE-related code > and scripts haven't changed AT ALL. The data in the remote MS SQL are the same > (I have access to a fixed snapshot). > > I am not aware of any resource leak during the FILL. When I monitor the memory > of the postgres process, it is constantly 164076 (VMSize). It starts to grow > during the UPDATE. > > I have absolutely no idea about possible cause of this. Does any of you know > how > does COPY FROM command work? The postgres documentation says that when it > fails, > the written data occupy space but are not reachable and one should run vacuum > to > get rid of them. Does this suggests that its internals are different than > those > of a regular INSERT? Do any of you guys have any idea about the cause of this? > Have you seen something similar when you used COPY FROM yourself? > > > Thank you for reading, > > Ladislav Lenart > > -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy?hl=en. For more options, visit https://groups.google.com/groups/opt_out.