* mario ruggier wrote on [2007-09-11 17:52:47 +0200]:
> Recently, on the Durus mailing list, Mike announced experimental sqlite and
> postres storages for durus
> [http://mail.mems-exchange.org/durusmail/durus-users/887/]. Iiuc, as they
> are these would anyway not help to meet your requirement (of not having
> another server process), but I wonder if, with some tinkering, there'd be
> any possibilities there.

I've got an improved release of the Postgresql storage /
DBAPIStorage classes - that, and a Sancho test file are attached. 

After switching DBAPI layers away from psycopg2 to something that supports
prepared statements and pg's COPY statement, I rather like this version,
performance wise.

After doing some stress testing with sqlite and Postgresql, for a
(very) large storage, I think I'd probably go with Postgresql over
sqlite if ShelfStorage wasn't the right choice.

One benefit of the Postgresql storage is that pg has a very fast
bulk import mechanism (the COPY statement); if you ever need to load
many (millions) records on anything more than a one-off basis, it can be
a real help.

You also could have a Postgresql process running on another box
entirely, which opens up some other avenues for scalability although
I've not looked at latency or performance in that scenario at all.

Still for me it was primarily a learning experience; I understand
the insides of Durus slightly better (not that I really needed to
for what I do) and it was fun to do. I can't possibly imagine
writing a storage back end for ZODB ;-) so writing one for Durus was
fun in and of itself.

But for the original question, David's limit child approach makes
the most sense. I keep meaning to think more seriously about hosting
issues for QP; perhaps may offer that as a service one of these
days.

Mike
"""
$URL: svn+ssh://svn.mikewatkins.ca/repo/trunk/parlez/lib/pgsql_storage.py $
$Id: pgsql_storage.py 43 2007-09-12 23:25:01Z mw $

PostgresqlStorage isn't a speed demon compared to native file system storages,
but it holds up very well compared to sqlite when the record count is large
(millions of records). Its industrial strength design and proven track record
with many concurrent connections might make it a more comfortable fit for some
applications.

This variant of PostgresqlStorage is for the pgsql DBAPI driver only. pgsql
provides access to Postgresql's very efficient COPY functionality (used in the
bulk_load_records utility function). Get it:

    http://pypi.python.org/pypi/python-pgsql/
    http://people.rpath.com/~gafton/pgsql

Usage:
    import pgsql
    sql_connection = pgsql.connect(database='test')
    connection = Connection(PostgresqlStorage(sql_connection))
    root = connection.get_root()
    ...
"""
from durus.serialize import unpack_record
from durus.utils import int8_to_str, str_to_int8
try:
    from parlez.dbapi_storage import DBAPIStorage # my as-yet unreleased library
except ImportError:
    from dbapi_storage import DBAPIStorage

class PostgresqlStorage(DBAPIStorage):
    """
    Provides a Postgresql storage backend for Durus. Requires a DBAPI
    compatible Postgresql interface; after evaluating several DBAPI interfaces,
    at this time only psycopg2 and pgsql is supported; pgsql is preferred for
    what it offers that psycopg2 does not: pgsql implements true server-side
    cursors, executemany properly, prepared statements and provides a
    convenient method connection.bulkload() for accessing Postgresql's
    extremely fast COPY functionality for importing data. For every day
    operations pgsql is faster provided you make use of prepared statements.

    At 2007-08-24 this storage should be considered experimental and unproven.

    This PostgresqlStorage variant is for pgsql *only*. See psycopg2_storage
    for a variant suitable for that dbapi driver.
    """
    def __init__(self, sql_connection, table_name='durus_records', **kwargs):
        DBAPIStorage.__init__(self, sql_connection, table_name=table_name, **kwargs)
        self._sql_prepared_queries = dict(
            new_oid = self._sql_connection.prepare(
                'INSERT INTO %s DEFAULT VALUES RETURNING p_oid' % self._sql_table_name),
            load = self._sql_connection.prepare(
                'SELECT record FROM %s WHERE p_oid = $1 and record is not NULL' %\
                self._sql_table_name),
            end = self._sql_connection.prepare(
                'UPDATE %s set record=$1, pack=0 WHERE p_oid = $2' % self._sql_table_name),
            )

    def _sql_ensure_tables(self):
        """
        Not portable at all.
        """
        def tables_exist():
            result = self._sql_queryone(
                """
                SELECT tablename from pg_tables
                WHERE schemaname='public'
                AND tablename=$1
                """, (self._sql_table_name,))
            return result and result[0] == self._sql_table_name or False
        def create_tables():
            self._sql_execute(
                """
                CREATE TABLE %s
                    (p_oid SERIAL UNIQUE PRIMARY KEY,
                     pack INTEGER DEFAULT 0,
                     record BYTEA)
                """ % (self._sql_table_name,))
            # SERIAL statement forces creation of a SEQUENCE <tablename>_p_oid_seq
            # Dropping table drops SERIAL created sequence; by default the serial
            # numbering system starts at 1; reset it to start at 0 for ROOT_OID:
            self._sql_execute(
                """
                ALTER SEQUENCE %s_p_oid_seq minvalue 0 restart with 0
                """ % (self._sql_table_name,))
            # pack on large tables will benefit from:
            self._sql_execute(
                """
                CREATE INDEX %s_pack ON %s (pack)
                """ % (self._sql_table_name, self._sql_table_name))
        if not tables_exist():
            create_tables()

    def new_oid(self):
        """() -> oid:str
        Return an unused oid.
        """
        cursor = self._sql_prepared_queries.get('new_oid')
        return int8_to_str(cursor.execute().fetchone()[0])

    def load(self, oid):
        """(oid:str) -> record:str

        Overriding load() to take advantage of pgsql prepared statement capability.
        """
        cursor = self._sql_prepared_queries.get('load')
        cursor.execute((str_to_int8(oid),))
        result = cursor.fetchone()
        if result:
            return result[0]
        else:
            raise KeyError, 'oid %s not in storage' % str_to_int8(oid)

    def end(self, handle_invalidations=None):
        """
        Concludes a Durus commit.
        """
        def gen_transaction():
            for oid, record in self.transaction.iteritems():
                yield (record, str_to_int8(oid))
        cursor = self._sql_prepared_queries.get('end')
        cursor.executemany(gen_transaction())
        self.transaction = None
        self._sql_connection.commit()

    def get_size(self):
        """() -> integer

        Uses Postgresql pg_class system table; answer not accurate in between
        inserts and deletions except when VACUUM or CLUSTER has been run.

        Used for since number is informational, not important. The real time
        alternative: select count() - does a full tablescan because of pg's MVCC system.
        """
        result = self._sql_queryone(
            "SELECT reltuples from pg_class where relname = '%s'" % self._sql_table_name)
        return int(result[0])


def bulk_load_records(records, target_storage):
    """(records:sequence(str), target_storage:PostgresqlStorage) -> None

    Bulk load records into a PostgresqlStorage, preserving existing object ids.
    Raises KeyError if ANY records other than root exist in the target storage.

    WARNING! ALL existing records, including root, in target will be deleted! WARNING!
    """
    from durus.serialize import unpack_record
    assert isinstance(target_storage, PostgresqlStorage)
    sqldb = target_storage._sql_connection
    cursor = sqldb.cursor()
    def check_target_storage():
        cursor.execute('SELECT count(p_oid) FROM %s WHERE p_oid != 0' %\
                       (target_storage._sql_table_name,))
        result = cursor.fetchone()
        if result and result[0] > 1:
            raise SystemError, (
                'No records, other than root, may exist in target storage. '
                'WARNING: target storage; root itself will also be replaced.')
        cursor.execute('DELETE FROM %s' % target_storage._sql_table_name)
        cursor.execute('ALTER SEQUENCE %s_p_oid_seq restart with 0' %\
                       (target_storage._sql_table_name,))
        sqldb.commit()
    def gen_oid_record():
        for record in records:
            oid, data, refdata = unpack_record(record)
            oid = str_to_int8(oid)
            yield (oid, record)
    check_target_storage()
    # pgsql exposes Postgresql's extremely efficient COPY functionality:
    sqldb.bulkload(target_storage._sql_table_name, gen_oid_record(), ('p_oid', 'record'))
    # as bulkload uses COPY; sequences are not updated so we've got to do that
    cursor.execute('SELECT max(p_oid) from %s' % (target_storage._sql_table_name,))
    max_oid = cursor.fetchone()[0]
    assert max_oid is not None
    cursor.execute(
        """
        ALTER SEQUENCE %s_p_oid_seq restart with %d
        """ % (target_storage._sql_table_name, max_oid+1))
    sqldb.commit()

"""
$URL: svn+ssh://svn.mikewatkins.ca/repo/trunk/parlez/lib/dbapi_storage.py $
$Id: dbapi_storage.py 41 2007-09-02 14:08:04Z mw $

Provides an abstract DBAPIStorage; two working implementations are provided
PostgresqlStorage (supporting psycopg2 driver) and SqliteStorage (supporting
Python 2.5's built in sqlite3 DBAPI interface as well as pysqlite2).

Motivation to do this thanks to a sqlite/apsw specific effort by Peter
Wilkinson of Thirdfloor Software Works Pty. Ltd.
"""

from durus.serialize import unpack_record
from durus.connection import ROOT_OID
from durus.utils import str_to_int8
from durus.storage import Storage
from itertools import islice, chain

class DBAPIStorage(Storage):
    """
    This is an abstract class which SQL-based Durus storages can inherit.

    This class assumes a Python DB API compatible connection.
    """
    def __init__(self, sql_connection, table_name='durus_records', **kwargs):
        if self.__class__ is DBAPIStorage:
            raise RuntimeError("DBAPIStorage is abstract")
        self.ROOT_OID = ROOT_OID
        self._sql_connection = sql_connection
        self._sql_table_name = table_name
        self._sql_options = dict(
            pack_increment = 1000,
        )
        self._sql_options.update(kwargs)
        self._sql_ensure_tables()
        self.transaction = []

    def _sql_execute(self, sql, *args):
        """(sql:str, *args) -> DBAPI cursor object

        All DBAPI adapters behave differently upon a cursor.execute() -
        some return the cursor object, some do not. Some return a
        logical True or False value.

        Here we return a cursor regardless of dbapi adapter.
        """
        cursor = self._sql_connection.cursor()
        cursor.execute(sql, *args)
        return cursor

    def _sql_queryone(self, sql, *args):
        """(sql:str, *args) -> SQL query result or None

        Encapsulates cursor(); execute(); fetchone().

        Not all queries return results (i.e. CREATE TABLE, UPDATE, INSERT), and
        DBAPI adapters behave differently on calls to fetchone() - some return
        a ProgrammingError (or other such exception) if there are no results.
        Some return None.

        Use _sql_execute() if you expect no results. This method will not trap
        errors raised by the DBAPI layer if there are no results to return.
        """
        cursor = self._sql_execute(sql, *args)
        return cursor.fetchone()

    def _sql_ensure_tables(self):
        """
        Ensure table(s) required exist, create if not.

        Table existence detection:
        Postgres: SELECT tablename from pg_tables where schemaname='public'
                  AND tablename=<table_name>
        sqlite:   PRAGMA table_info(<table_name>)
        """
        def tables_exist():
            pass
        def create_tables():
            pass
        if not tables_exist():
            create_tables()
        raise NotImplementedError, (
            'Table existence detection and creation is not portable, override '
            'in your subclass.')

    def new_oid(self):
        """() -> str
        Must return return int8_to_str()
        """
        raise NotImplementedError, ('OID generation is not portable')

    def load(self, oid):
        """(oid:str) -> record:str

        One of the few portable queries and can be used as-is if you are
        willing to live without using a proper bind variable. You might also
        want to override if the DPAPI adapter being used happens to provide
        some enhancement that makes sense to take advantage of, such as
        prepared statements.
        """
        result = self._sql_queryone(
            'SELECT record FROM %s WHERE p_oid = %s' % (self._sql_table_name,
                                                        str_to_int8(oid)))
        if result:
            return str(result[0])
        else:
            raise KeyError, 'oid %s not in storage' % str_to_int8(oid)

    def begin(self):
        """() -> None

        Begin a new Durus transaction (a commit).
        """
        self.transaction = {}

    def store(self, oid, record):
        """(oid:str, record:object) -> None

        Adds record to the transaction queue for a commit underway.
        """
        assert len(oid) == 8
        assert oid not in self.transaction
        self.transaction[oid] = record

    def end(self, handle_invalidations=None):
        """
        Conclude a commit.
        """
        raise NotImplementedError, ('Parameter passing to queries not portable')

    def sync(self):
        return []

    def close(self):
        """Close storage database connections"""
        if self._sql_connection:
            self._sql_connection.close()

    def get_packer(self):
        """
        Returns an incremental packer used by StorageServer.

        This implementation of _packer can be used for any DBAPI driver as
        standard python string parameters and dbapi cursors are used.
        """
        def gen_oids():
            for oid, record in self.gen_oid_record(batch_size=pack_increment):
                record_oid, data, refdata = unpack_record(record)
                assert oid == record_oid
                yield str_to_int8(oid)
        def gen_batch(iterable, size=None):
            iterable = iter(iterable)
            while True:
                chunk = islice(iterable, size)
                yield chain([chunk.next()], chunk)
        def packer():
            assert not self.transaction
            self._sql_execute('BEGIN')
            for batch in gen_batch(gen_oids(), size=pack_increment):
                # this is significantly faster than executemany
                sql = "UPDATE %s set pack = 1 WHERE p_oid in (%s)" %\
                               (self._sql_table_name, ','.join(map(str,batch)))
                self._sql_execute(sql)
                yield None
            self._sql_connection.execute(
                "DELETE FROM %s WHERE pack = 0" % self._sql_table_name)
            self._sql_connection.execute(
                "UPDATE %s SET pack = 0" % self._sql_table_name)
            self._sql_execute('COMMIT')
        pack_increment = self._sql_options.get('pack_increment', 1000)
        return packer()

    def pack(self):
        """
        Remove obsolete records.
        """
        for z in self.get_packer():
            pass

    def get_size(self):
        """() -> int

        Many SQL databases impose a large cost on select count(*) from ...
        thus this is not implemented, returning None. Subclasses may have
        db specific performance workarounds to offer...
        """
        return None

    def bulk_load(self, oids):
        """(oids:sequence(oid:str)) -> sequence(record:str)

        This portable approach is approx 3X faster than loading one by one.
        """
        requested_oids = map(str_to_int8, oids)
        requested_oids.sort()
        cursor = self._sql_connection.cursor()
        cursor.execute(
            """
            SELECT p_oid, record from %s
            WHERE p_oid in (%s)
            AND record IS NOT NULL
            """ %\
            (self._sql_table_name, ','.join(map(str, requested_oids))))
        result = cursor.fetchall()
        returned_oids = [int_oid for int_oid, record in result]
        returned_oids.sort()
        if requested_oids != returned_oids:
            # shouldn't occur under normal circumstances
            not_found = []
            for r_oid in requested_oids:
                if r_oid not in returned_oids:
                    not_found += [r_oid]
            raise KeyError('oids %s not in DBAPIStorage' % ', '.join(map(str, not_found)))
        for p_oid, record in result:
            yield record

    def gen_oid_record(self, start_oid=None, batch_size=None):
        # use default algorithm but take advantage of larger batch size
        # which is sensible for a SQL storage
        # note that this won't guarantee all bulk_load() requests are
        # batch_size in length; the traversal of the object graph
        # will determine size. For large collections within lists or mappings
        # a larger batch size should help
        if not batch_size:
            batch_size = self._sql_options.get('pack_increment', 1000)
        return Storage.gen_oid_record(self,
                                      start_oid=start_oid,
                                      batch_size=batch_size)
"""
$URL: svn+ssh://svn.mikewatkins.ca/repo/trunk/parlez/lib/test/utest_pgsql_storage.py $
$Id: utest_pgsql_storage.py 43 2007-09-12 23:25:01Z mw $
"""
from durus.connection import Connection
from durus.serialize import pack_record
from durus.utils import int8_to_str
from parlez.pgsql_storage import PostgresqlStorage
from sancho.utest import UTest, raises
import pgsql

DATABASE_NAME = 'durus'
TABLE_NAME = 'durus_test'

class PgsqlTest(UTest):

    def _pre(self):
        import pgsql
        self.db = pgsql.connect(database=DATABASE_NAME)
        try:
            self.db.execute('drop table %s' % TABLE_NAME)
            self.db.commit()
        except:
            pass

    def if_postgres_running(self):
        # TODO
        # test for postgres in process list
        # make this a function and have the entire test run if True
        pass

    def check_init(self):
        assert self.db
        PostgresqlStorage(self.db, table_name=TABLE_NAME)
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 0
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        assert con
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 1
        assert self.db.execute('select p_oid from %s' % TABLE_NAME).fetchone()[0] == 0

    def check_postgresql_storage(self):
        # lifted from durus utest_storage.py "check_memory_storage"
        b = PostgresqlStorage(self.db, table_name=TABLE_NAME)
        assert b.new_oid() == int8_to_str(0)
        assert b.new_oid() == int8_to_str(1)
        assert b.new_oid() == int8_to_str(2)
        raises(KeyError, b.load, int8_to_str(0))
        record = pack_record(int8_to_str(0), 'ok', '')
        b.begin()
        b.store(int8_to_str(0), record)
        b.end()
        b.sync()
        b.begin()
        b.store(int8_to_str(1), pack_record(int8_to_str(1), 'no', ''))
        b.end()
        assert len(list(b.gen_oid_record())) == 1
        assert record == b.load(int8_to_str(0))
        records = b.bulk_load([int8_to_str(0), int8_to_str(1)])
        assert len(list(records)) == 2
        records = b.bulk_load([int8_to_str(0), int8_to_str(1), int8_to_str(2)])
        raises(KeyError, list, records)

    def check_size(self):
        # won't return an accurate value unless vacuum run; make sure it doesn't barf
        # just the same
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        con.storage.get_size()
        self.db.execute('vacuum %s' % TABLE_NAME)
        assert con.storage.get_size() == 1

    def check_root(self):
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        root = con.get_root()
        assert root is not None

    def check_pack(self):
        from dulcinea.news import NewsItem, NewsDatabase
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        root = con.get_root()
        news = NewsDatabase()
        root['news'] = news
        for i in range(10):
            item = NewsItem()
            item.set_title('%d news item' % i)
            news.add(item)
        con.commit()
        # the object count may fail if NewsItem/NewsDatabase change in the future
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        con.pack()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        del news.mapping[1]
        con.commit()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        con.pack()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 14
        assert self.db.execute('select count(pack) from %s where pack != 0' %\
                               TABLE_NAME).fetchone()[0] == 0




if __name__ == '__main__':
    PgsqlTest()
_______________________________________________
QP mailing list
[email protected]
http://mail.mems-exchange.org/mailman/listinfo/qp

Reply via email to