Decided to scratch an itch today. I wanted to use PostgreSQL's array_agg() function using an 'ORDER BY' clause. Then of course Python lists aren't hashable so I wrote a TypeDecorator to return the results as a tuple instead.

Sending along just in case someone else needs something similar. Thanks to Conor and Michael Bayer for
their help a while back on a compiler extension for string_agg().

--
David Gardner
Pipeline Tools Programmer
Jim Henson Creature Shop
dgard...@creatureshop.com

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

from StringIO import StringIO 

from sqlalchemy.sql.expression import ColumnElement, _literal_as_column
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.dialects.postgresql import ARRAY

class StaticArray(sa.types.TypeDecorator):
    impl = sa.types.TypeEngine

    def __init__(self):
        super(StaticArray, self).__init__()
        import sqlalchemy.dialects.postgresql.base as pg
        self.__supported = {pg.PGDialect:pg.PGArray}
        del pg

    def load_dialect_impl(self, dialect):
        if dialect.__class__ in self.__supported:
            return self.__supported[dialect.__class__](sa.String)
        else:
            return dialect.type_descriptor(sa.String)

    def process_bind_param(self, value, dialect):
        return value

    def process_result_value(self, value, dialect):
        return tuple(value)
            
    def is_mutable(self):
        return False

class array_agg(ColumnElement):
    type = StaticArray()
    
    def __init__(self, expr, order_by=None):
        self.expr = _literal_as_column(expr)

        if order_by is not None:
            self.order_by = _literal_as_column(order_by)
        else:
            self.order_by = None


@compiles(array_agg, 'postgresql')
def _compile_array_agg_postgresql(element, compiler, **kw):
    buf = StringIO()
    buf.write('array_agg(')
    buf.write(compiler.process(element.expr))
    if element.order_by is not None:
        buf.write(' ORDER BY ')
        buf.write(compiler.process(element.order_by))
    buf.write(')')
    return buf.getvalue()

Reply via email to