I'm using SQLAlchemy 0.6.4 on top of OpenBSD utilitizing PostgreSQL 8.4.4.
As a first project, I am gathering statistics on the availability of another
Open Source project.  The schema is normalized, & the following SQL query
(which works at the console) to find the latest snapshot is giving me fits
when translating to Python:

FROM snapshots s
WHERE s.cron_id = (
    SELECT ce.id FROM cron_events ce
    WHERE ce.timestamp = (
        SELECT MAX(ce.timestamp)
        FROM snapshots s
        JOIN cron_events ce ON ce.id = s.cron_id
        JOIN platforms p ON p.id = s.platform_id
        WHERE p.name = 'amd64'))

Aside from the nested subqueries, I'm stuck at implementing the innermost
SELECT which finds the latest recorded snaphot:

SELECT MAX(ce.timestamp)
FROM snapshots s
        JOIN cron_events ce ON ce.id = s.cron_id
        JOIN platforms p ON p.id = s.platform_id
WHERE p.name = 'amd64'

The class structure is as follows:

class Cron(Base):
    """collect various timed values here"""

    __tablename__ = CRON_TABLENAME

    id = Column(Integer, Sequence(CRON_TABLENAME + '_id_seq'),
    timestamp = Column(DateTime, nullable=False, unique=True)
    ftp_time = Column(Interval, nullable=False)
    db_time = Column(Interval, nullable=True)

    platforms = relationship('Platform', order_by='Platform.id',
    snapshots = relationship('Snapshot', order_by='Snapshot.id',

    def __init__(self, timestamp, ftp_time):
        self.timestamp = timestamp
        self.ftp_time = ftp_time

    def __repr__(self):
        return "Cron<'%s','%s','%s','%s'>" % (self.id, self.timestamp,
self.ftp_time, self.db_time)

class Platform(Base):
    """abstraction of platform name & first occurrence"""

    __tablename__ = PLATFORM_TABLENAME

    id = Column(Integer, Sequence(PLATFORM_TABLENAME + '_id_seq'),
    cron_id = Column(Integer, ForeignKey(CRON_TABLENAME + '.id'),
    name = Column(String(32), nullable=False, unique=True)

    def __init__(self, cron_id, name):
        self.cron_id = cron_id
        self.name = name

    def __repr__(self):
        return "Platform<'%s','%s','%s','%s'>" % (self.id, self.cron_id,
self.cron.timestamp, self.name)

class Snapshot(Base):
    """abstraction of individual platform snapshot"""

    __tablename__ = SNAPSHOT_TABLENAME

    id = Column(Integer, Sequence(SNAPSHOT_TABLENAME + '_id_seq'),
    cron_id = Column(Integer, ForeignKey(CRON_TABLENAME + '.id'),
    platform_id = Column(Integer, ForeignKey(PLATFORM_TABLENAME + '.id'),

    def __init__(self, cron_id, platform_id):
        self.cron_id = cron_id
        self.platform_id = platform_id

    def __repr__(self):
        return "Snapshot<'%s','%s','%s','%s'>" % (self.id, self.cron_id,
self.cron.timestamp, self.platform_id\

The following Python code:

for t in session.query(func.max(Cron.timestamp)).\
             join((Snapshot, Cron.id == Snapshot.cron_id), (Platform,
Platform.id == Snapshot.platform_id)).\
             filter(Platform.id == platform_id):
       print t

...or variations such as:

t = session.query(func.max(Cron.timestamp)).\
             join((Snapshot, Cron.id == Snapshot.cron_id), (Platform,
Platform.id == Snapshot.platform_id)).\
             filter(Platform.id == platform_id).one()

...all are giving me the following error.  I suspect I am missing something
obvious.  Any insight shared would certainly be appreciated.


2010-10-30 14:47:43,783 INFO sqlalchemy.engine.base.Engine.0x...dccL SELECT
max(cron_events.timestamp) AS max_1
FROM cron_events JOIN snapshots ON cron_events.id = snapshots.cron_id JOIN
platforms ON platforms.id = snapshots.platform_id
WHERE platforms.id = %(id_1)s
2010-10-30 14:47:43,790 INFO sqlalchemy.engine.base.Engine.0x...dccL
{'id_1': (1,)}
Traceback (most recent call last):
  File "./snapshots.py", line 138, in <module>
    snapshot_id = get_latest_snapshot(cron_id, platform_id, name)
  File "./snapshots.py", line 110, in get_latest_snapshot
    filter(Platform.id == platform_id):
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/orm/query.py",
line 1451, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/orm/query.py",
line 1456, in _execute_and_instances
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/orm/session.py",
line 737, in execute
    clause, params or {})
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/base.py",
line 1109, in execute
    return Connection.executors[c](self, object, multiparams, params)
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/base.py",
line 1186, in _execute_clauseelement
    return self.__execute_context(context)
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/base.py",
line 1215, in __execute_context
    context.parameters[0], context=context)
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/base.py",
line 1284, in _cursor_execute
    self._handle_dbapi_exception(e, statement, parameters, cursor, context)
  File "/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/base.py",
line 1282, in _cursor_execute
    self.dialect.do_execute(cursor, statement, parameters, context=context)
"/usr/local/lib/python2.6/site-packages/sqlalchemy/engine/default.py", line
277, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (ProgrammingError) can't adapt type
'NamedTuple' 'SELECT max(cron_events.timestamp) AS max_1 \nFROM cron_events
JOIN snapshots ON cron_events.id = snapshots.cron_id JOIN platforms ON
platforms.id = snapshots.platform_id \nWHERE platforms.id = %(id_1)s'
{'id_1': (1,)}

You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
For more options, visit this group at 

Reply via email to