On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:
>
> the bind_expression() hook is here to allow you to re-render the 
> expression.  assuming value-bound bindparam() objects (e.g. not like 
> you'd get with an INSERT or UPDATE usually), the value should be present 
> and you can do this (had to work up a POC): 
>
> from sqlalchemy import * 
> from sqlalchemy.types import UserDefinedType 
>
>
> class T(UserDefinedType): 
>
>      def bind_expression(self, colexpr): 
>          return literal_column(colexpr.value)  # or whatever is needed 
> here 
>
> t = table('t', column('x', T())) 
>
> print t.select().where(t.c.x == 'hi') 
>
>
>
>
> > 
> > Also, is there a way, inside VARRAY.__init__() or some other place that 
> > is called before table creation to specify the sa.event.listen(<table>, 
> > "before_create", self.create_ddl().execute_if(dialect='oracle'))? 
>
>
> look into adding SchemaType as a mixin, it signals to the owning Column 
> that it should receive events.   You can then add the events to your 
> type itself like before_parent_attach which should fire for the Column. 
>


OK, I got it working pretty much as desired.

Adding 

    def bind_expression(self, bindvalue):
        return sa.literal_column(self.process_literal_param(bindvalue.value, 
None), self)

makes insert and update statements work.

I also got the drop/create business working automatically by copying code 
from the Postgresql ENUM implementation, though it seems like an excessive 
amount of boilerplate. To keep the code as-is I had to monkey-patch 
OracleDialect to add a has_type() method -- any chance you'd want to add 
that for 1.1?

import six
import sqlalchemy as sa


# Moneky-patch OracleDialect to have has_type() mehtod
from sqlalchemy.dialects.oracle.base import OracleDialect

def has_type(self, connection, type_name, schema=None):
    if not schema:
        schema = self.default_schema_name
    cursor = connection.execute(
        sa.sql.text("SELECT type_name FROM all_types "
                    "WHERE type_name = :name AND owner = :schema_name"),
        name=self.denormalize_name(type_name),
        schema_name=self.denormalize_name(schema))
    return cursor.first() is not None

OracleDialect.has_type = has_type


class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):

    def __init__(self, name, size_limit, item_type, nullable=True, 
as_tuple=False,
                 inherit_schema=True, create_type=True, **kw):
        sa.types.UserDefinedType.__init__(self)
        sa.types.SchemaType.__init__(self, name=name, 
inherit_schema=inherit_schema, **kw)
        self.size_limit = size_limit
        self.item_type = item_type
        self.nullable = nullable
        self.as_tuple = as_tuple
        self.create_type = create_type

    def get_col_spec(self):
        return (self.schema + '.' + self.name) if self.schema else self.name

    def compile(self, dialect=None):
        return (self.schema + '.' + self.name) if self.schema else self.name

    def create(self, bind=None, checkfirst=False):
        if not checkfirst or \
                not bind.dialect.has_type(
                    bind, self.name, schema=self.schema):
            sql = "CREATE TYPE {} AS VARRAY({}) OF 
{}".format(self.compile(dialect=bind.dialect),
                                                              self.size_limit,
                                                              
self.item_type.compile(dialect=bind.dialect))
            if not self.nullable:
                sql += " NOT NULL"
            bind.execute(sql)

    def drop(self, bind=None, checkfirst=False):
        if not checkfirst or \
                bind.dialect.has_type(bind, self.name, schema=self.schema):
            bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))

    def _check_for_name_in_memos(self, checkfirst, kw):
        """Look in the 'ddl runner' for 'memos', then
        note our name in that collection.

        This to ensure a particular named enum is operated
        upon only once within any kind of create/drop
        sequence without relying upon "checkfirst".
        """
        if not self.create_type:
            return True
        if '_ddl_runner' in kw:
            ddl_runner = kw['_ddl_runner']
            if '_oc_varrays' in ddl_runner.memo:
                pg_enums = ddl_runner.memo['_oc_varrays']
            else:
                pg_enums = ddl_runner.memo['_oc_varrays'] = set()
            present = self.name in pg_enums
            pg_enums.add(self.name)
            return present
        else:
            return False

    def _on_table_create(self, target, bind, checkfirst, **kw):
        if checkfirst or (
                not self.metadata and
                not kw.get('_is_metadata_operation', False)) and \
                not self._check_for_name_in_memos(checkfirst, kw):
            self.create(bind=bind, checkfirst=checkfirst)

    def _on_table_drop(self, target, bind, checkfirst, **kw):
        if not self.metadata and \
            not kw.get('_is_metadata_operation', False) and \
                not self._check_for_name_in_memos(checkfirst, kw):
            self.drop(bind=bind, checkfirst=checkfirst)

    def _on_metadata_create(self, target, bind, checkfirst, **kw):
        if not self._check_for_name_in_memos(checkfirst, kw):
            self.create(bind=bind, checkfirst=checkfirst)

    def _on_metadata_drop(self, target, bind, checkfirst, **kw):
        if not self._check_for_name_in_memos(checkfirst, kw):
            self.drop(bind=bind, checkfirst=checkfirst)

    def process_literal_param(self, value, dialect):
        return "{}({})".format(self.compile(dialect=dialect),
                               ','.join("NULL" if x is None else
                                        ("'%s'" % x) if isinstance(x, 
six.string_types) else str(x)
                                        for x in value))

    def literal_processor(self, dialect):
        def processor(value):
            return self.process_literal_param(value, dialect)
        return processor

    def bind_expression(self, bindvalue):
        return sa.literal_column(self.process_literal_param(bindvalue.value, 
None), self)

    def process_result_value(self, value, dialect):
        if self.as_tuple:
            value = tuple(value)
        return value

    def result_processor(self, dialect, coltype):
        def processor(value):
            return self.process_result_value(value, dialect)
        return processor

    def adapt(self, impltype, **kw):
        return sa.types.SchemaType.adapt(self, impltype,
                                         size_limit=self.size_limit,
                                         item_type=self.item_type,
                                         nullable=self.nullable,
                                         as_tuple=self.as_tuple,
                                         **kw)


if __name__ == '__main__':

    uri = "oracle://user:password@host"

    import alchy
    import sqlalchemy.dialects.oracle as oc

    db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri, 
'SQLALCHEMY_ECHO': True})

    class TestVarray(db.Model):
        __tablename__ = 'test_varray'
        __table_args__ = { 'schema': 'barra' }
        idx = sa.Column(sa.Integer, primary_key=True)
        label = sa.Column(sa.String(20), nullable=False)
        words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8), 
nullable=True, schema='barra'), nullable=False)
        numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, oc.NUMBER(), 
nullable=True, inherit_schema=True), nullable=False)

    db.drop_all()
    db.create_all()

    db.engine.execute(TestVarray.__table__.insert({'idx': 1,
                                                   'label': 'One',
                                                   'words': ['Once', 'upon', 
'a', 'time'],
                                                   'numbers': [1.1, 1.2]}).
                      compile(compile_kwargs={"literal_binds": True}))
    db.engine.execute(TestVarray.__table__.insert({'idx': 2,
                                                   'label': 'Two',
                                                   'words': ['To', 'be', 'or', 
'not'],
                                                   'numbers': [2.1, 2.2]}))
    db.engine.execute(TestVarray.__table__.update().
                      where(TestVarray.__table__.c.idx == 1).
                      values(numbers=[1.1111, 1.2222]))

    print TestVarray.query.all()
    print db.session().query(TestVarray.label, TestVarray.words, 
TestVarray.numbers).all()

[<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'], 
numbers=[1.1111, 1.2222])>,
 <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], numbers=[2.1, 
2.2])>]
[('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
 ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]




-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to