Hello.

DISCLAIMER: This is NOT about SQL Alchemy per se. I just seek opinions and ideas
from people that are familiar with relational databases, especially postgres.
Sorry for the longish post...

Someone posted a link to the following presentation a few days back:

    https://speakerdeck.com/rwarren/a-brief-intro-to-profiling-in-python

It inspired me to try pg's COPY FROM command instead of a compiled INSERT
command(s) to copy large amount of data from a remote MS SQL database via pyodbc
to the local postgres instance (via psycopg2). Accordining to the presentation,
this should be approx. 2-3 times faster than my current solution.

ALGORITHM:
* FILL (populate local postgres tables from the remote MS SQL databse):
  * Drop the tables.
  * Create them a new.
  * For each such table:
    * Query MS SQL db and insert result rows to the table.
  * COMMIT.
* UPDATE (application downtime period):
  * Now, all data from the remote MS SQL are present in local postgres tables.
  * Execute various insert/update scripts that put local data in sync.
  * COMMIT.

NOte, that the pg_dump of the entire postgres DB in SQL format has about 1GB,
500MB for the "temporary" tables with copies of remote data and 500MB for the
regular tables.

This algorithm works for several weeks now on a daily basis with only minor
issues mostly related to network outages.

I replaced compiled INSERTs with COPY FROM command. I use os.pipe. One end reads
MS SQL query rows, converts them to CSV and writes them to output. The other end
is a COPY FROM command that processes the CSV lines. It runs in a separate
thread (threading.Thread). The interesting portion of the code is this:


# Snippet of...
class ExtDbTable:
    """Logic to populate one local postgres table with data from a remote MS SQL
database."""

    def fill(self, cursor):
        """Perform a SQL query to the external DB and populate the table with
        its results.
        cursor argument represents (pyodbc) connection to the external DB.
        The table is guaranteed to be empty.
        Perform the query to the external DB and insert its results via
        postgres COPY FROM command as one huge (but streamlined) CSV.
        """
        t0 = datetime.utcnow()
        gen = self._fetch(cursor)
        self._insert(gen)
        t1 = datetime.utcnow()
        if self._echo:
            info(u"{}.fill\ntotal: {}\n", self.name, t1 - t0)

    def _fetch(self, cursor):
        """Generator that fetches one result row at a time from a remote MS SQL
database."""
        sql = self._query
        if self._echo:
            info(u"\n\n#### SQL ####\nTable: {}\n{}", self.name, sql)
        t0 = datetime.utcnow()
        cursor.execute(sql)
        t1 = datetime.utcnow()
        if self._echo:
            info(u"{}._fetch\ntotal: {}", self.name, t1 - t0)
        each = cursor.fetchone()
        while each:
            yield each
            each = cursor.fetchone()
            # TODO: Remove the obsolete code.
            # yield dict(zip([each_desc[0] for each_desc in
each.cursor_description], each))

    def _insert(self, gen):
        """Insert rows to the ext table via postgres COPY FROM command:
        * Open pipe.
        * Start new thread that reads CSV lines from the pipe and inserts them
        into the table via COPY FROM command. The thread is necessary because
        cursor.copy_expert() (=COPY FROM command) reads till EOF. This would
        block because the pipe has no EOF (yet).
        * Read SQL result rows from the input generator (one at a time),
        convert each to CSV format and write it to the pipe.

        NOTE: This should be approx. 2-3x faster than the previous solution
        that used large INSERT statement(s) escaped with cursor.mogrify().
        Maybe even more, because the current implementation SHOULD be using
        much less memory.

        NOTE: The real measurement shows that the new implementation takes
        70% of the old (1h50m vs. 1h15m). HOWEVER, the following update
        consumes ALL AVAILABLE MEMORY AND DOES NOT FINISH.
        """
        t0 = datetime.utcnow()
        rd, wd = os.pipe() # d-descriptor
        rf, wf = os.fdopen(rd), os.fdopen(wd, 'w') # f-file
        # NOTE: Session = scoped_session(sessionmaker(bind=engine, 
autoflush=False))
        cursor = Session().connection().connection.cursor()
        null = '_?_'

        def _convert_value(v):
            if v is None:
                return null
            assert v != null
            if type(v) is str:
                return unicode(each).encode('utf-8')
            if type(v) is unicode:
                return each.encode('utf-8')
            return str(v)

        # Consumer (COPY FROM command).
        def _consumer():
            sql = u"COPY {} FROM STDIN WITH (FORMAT csv, ENCODING 'utf-8', NULL
'{}')".format(self.name, null)
            # NOTE: Default buffer size is 8192 bytes. The value 16*8192 was a
            # bit faster than the default, 30s improvement on a 10m test run.
            # Further increment of the buffer had no impact.
            cursor.copy_expert(sql, rf, size=131072)
        consumer = threading.Thread(target=_consumer)
        consumer.start()

        # Producer (CSV writer).
        writer = csv.writer(wf, lineterminator='\n')
        for sql_line in gen:
            csv_line = [_convert_value(each) for each in sql_line]
            writer.writerow(csv_line)
        wf.close()

        # Wait for the consumer to finish.
        consumer.join()
        rf.close()
        cursor.close()
        t1 = datetime.utcnow()
        if self._echo:
            info(u"{}._insert\ntotal: {}\n", self.name, t1 - t0)


The FILL runs a bit faster (1h15m vs. 1h50m) and uses much less memory than
before. Thus the FILL portion has improved.

HOWEVER, the initial UPDATE (on an empty postgres database) does NOT complete AT
ALL. The postgres process eventually consumes all available memory and all swap
space and thus brings everything to a halt.

The ONLY change I am aware of is from INSERT to COPY FROM. UPDATE-related code
and scripts haven't changed AT ALL. The data in the remote MS SQL are the same
(I have access to a fixed snapshot).

I am not aware of any resource leak during the FILL. When I monitor the memory
of the postgres process, it is constantly 164076 (VMSize). It starts to grow
during the UPDATE.

I have absolutely no idea about possible cause of this. Does any of you know how
does COPY FROM command work? The postgres documentation says that when it fails,
the written data occupy space but are not reachable and one should run vacuum to
get rid of them. Does this suggests that its internals are different than those
of a regular INSERT? Do any of you guys have any idea about the cause of this?
Have you seen something similar when you used COPY FROM yourself?


Thank you for reading,

Ladislav Lenart


-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to