Hello.

Please ignore my previous post. The cause of the UPDATE problem is still unknown
to me but it is NOT related to COPY FROM command in any way. I was just told
that we are unable to perform the initial UPDATE (i.e. from an empty postgres DB
to a full one) for two weeks now. The UPDATE code haven't changed at all. We
only added a couple of indexes. Might these cause such a catastrophic behaviour?

Ladislav Lenart


On 14.3.2013 12:04, Ladislav Lenart wrote:
> Hello.
> 
> DISCLAIMER: This is NOT about SQL Alchemy per se. I just seek opinions and 
> ideas
> from people that are familiar with relational databases, especially postgres.
> Sorry for the longish post...
> 
> Someone posted a link to the following presentation a few days back:
> 
>     https://speakerdeck.com/rwarren/a-brief-intro-to-profiling-in-python
> 
> It inspired me to try pg's COPY FROM command instead of a compiled INSERT
> command(s) to copy large amount of data from a remote MS SQL database via 
> pyodbc
> to the local postgres instance (via psycopg2). Accordining to the 
> presentation,
> this should be approx. 2-3 times faster than my current solution.
> 
> ALGORITHM:
> * FILL (populate local postgres tables from the remote MS SQL databse):
>   * Drop the tables.
>   * Create them a new.
>   * For each such table:
>     * Query MS SQL db and insert result rows to the table.
>   * COMMIT.
> * UPDATE (application downtime period):
>   * Now, all data from the remote MS SQL are present in local postgres tables.
>   * Execute various insert/update scripts that put local data in sync.
>   * COMMIT.
> 
> NOte, that the pg_dump of the entire postgres DB in SQL format has about 1GB,
> 500MB for the "temporary" tables with copies of remote data and 500MB for the
> regular tables.
> 
> This algorithm works for several weeks now on a daily basis with only minor
> issues mostly related to network outages.
> 
> I replaced compiled INSERTs with COPY FROM command. I use os.pipe. One end 
> reads
> MS SQL query rows, converts them to CSV and writes them to output. The other 
> end
> is a COPY FROM command that processes the CSV lines. It runs in a separate
> thread (threading.Thread). The interesting portion of the code is this:
> 
> 
> # Snippet of...
> class ExtDbTable:
>     """Logic to populate one local postgres table with data from a remote MS 
> SQL
> database."""
> 
>     def fill(self, cursor):
>         """Perform a SQL query to the external DB and populate the table with
>         its results.
>         cursor argument represents (pyodbc) connection to the external DB.
>         The table is guaranteed to be empty.
>         Perform the query to the external DB and insert its results via
>         postgres COPY FROM command as one huge (but streamlined) CSV.
>         """
>         t0 = datetime.utcnow()
>         gen = self._fetch(cursor)
>         self._insert(gen)
>         t1 = datetime.utcnow()
>         if self._echo:
>             info(u"{}.fill\ntotal: {}\n", self.name, t1 - t0)
> 
>     def _fetch(self, cursor):
>         """Generator that fetches one result row at a time from a remote MS 
> SQL
> database."""
>         sql = self._query
>         if self._echo:
>             info(u"\n\n#### SQL ####\nTable: {}\n{}", self.name, sql)
>         t0 = datetime.utcnow()
>         cursor.execute(sql)
>         t1 = datetime.utcnow()
>         if self._echo:
>             info(u"{}._fetch\ntotal: {}", self.name, t1 - t0)
>         each = cursor.fetchone()
>         while each:
>             yield each
>             each = cursor.fetchone()
>             # TODO: Remove the obsolete code.
>             # yield dict(zip([each_desc[0] for each_desc in
> each.cursor_description], each))
> 
>     def _insert(self, gen):
>         """Insert rows to the ext table via postgres COPY FROM command:
>         * Open pipe.
>         * Start new thread that reads CSV lines from the pipe and inserts them
>         into the table via COPY FROM command. The thread is necessary because
>         cursor.copy_expert() (=COPY FROM command) reads till EOF. This would
>         block because the pipe has no EOF (yet).
>         * Read SQL result rows from the input generator (one at a time),
>         convert each to CSV format and write it to the pipe.
> 
>         NOTE: This should be approx. 2-3x faster than the previous solution
>         that used large INSERT statement(s) escaped with cursor.mogrify().
>         Maybe even more, because the current implementation SHOULD be using
>         much less memory.
> 
>         NOTE: The real measurement shows that the new implementation takes
>         70% of the old (1h50m vs. 1h15m). HOWEVER, the following update
>         consumes ALL AVAILABLE MEMORY AND DOES NOT FINISH.
>         """
>         t0 = datetime.utcnow()
>         rd, wd = os.pipe() # d-descriptor
>         rf, wf = os.fdopen(rd), os.fdopen(wd, 'w') # f-file
>         # NOTE: Session = scoped_session(sessionmaker(bind=engine, 
> autoflush=False))
>         cursor = Session().connection().connection.cursor()
>         null = '_?_'
> 
>         def _convert_value(v):
>             if v is None:
>                 return null
>             assert v != null
>             if type(v) is str:
>                 return unicode(each).encode('utf-8')
>             if type(v) is unicode:
>                 return each.encode('utf-8')
>             return str(v)
> 
>         # Consumer (COPY FROM command).
>         def _consumer():
>             sql = u"COPY {} FROM STDIN WITH (FORMAT csv, ENCODING 'utf-8', 
> NULL
> '{}')".format(self.name, null)
>             # NOTE: Default buffer size is 8192 bytes. The value 16*8192 was a
>             # bit faster than the default, 30s improvement on a 10m test run.
>             # Further increment of the buffer had no impact.
>             cursor.copy_expert(sql, rf, size=131072)
>         consumer = threading.Thread(target=_consumer)
>         consumer.start()
> 
>         # Producer (CSV writer).
>         writer = csv.writer(wf, lineterminator='\n')
>         for sql_line in gen:
>             csv_line = [_convert_value(each) for each in sql_line]
>             writer.writerow(csv_line)
>         wf.close()
> 
>         # Wait for the consumer to finish.
>         consumer.join()
>         rf.close()
>         cursor.close()
>         t1 = datetime.utcnow()
>         if self._echo:
>             info(u"{}._insert\ntotal: {}\n", self.name, t1 - t0)
> 
> 
> The FILL runs a bit faster (1h15m vs. 1h50m) and uses much less memory than
> before. Thus the FILL portion has improved.
> 
> HOWEVER, the initial UPDATE (on an empty postgres database) does NOT complete 
> AT
> ALL. The postgres process eventually consumes all available memory and all 
> swap
> space and thus brings everything to a halt.
> 
> The ONLY change I am aware of is from INSERT to COPY FROM. UPDATE-related code
> and scripts haven't changed AT ALL. The data in the remote MS SQL are the same
> (I have access to a fixed snapshot).
> 
> I am not aware of any resource leak during the FILL. When I monitor the memory
> of the postgres process, it is constantly 164076 (VMSize). It starts to grow
> during the UPDATE.
> 
> I have absolutely no idea about possible cause of this. Does any of you know 
> how
> does COPY FROM command work? The postgres documentation says that when it 
> fails,
> the written data occupy space but are not reachable and one should run vacuum 
> to
> get rid of them. Does this suggests that its internals are different than 
> those
> of a regular INSERT? Do any of you guys have any idea about the cause of this?
> Have you seen something similar when you used COPY FROM yourself?
> 
> 
> Thank you for reading,
> 
> Ladislav Lenart
> 
> 


-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to