On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote: > > the bind_expression() hook is here to allow you to re-render the > expression. assuming value-bound bindparam() objects (e.g. not like > you'd get with an INSERT or UPDATE usually), the value should be present > and you can do this (had to work up a POC): > > from sqlalchemy import * > from sqlalchemy.types import UserDefinedType > > > class T(UserDefinedType): > > def bind_expression(self, colexpr): > return literal_column(colexpr.value) # or whatever is needed > here > > t = table('t', column('x', T())) > > print t.select().where(t.c.x == 'hi') > > > > > > > > Also, is there a way, inside VARRAY.__init__() or some other place that > > is called before table creation to specify the sa.event.listen(<table>, > > "before_create", self.create_ddl().execute_if(dialect='oracle'))? > > > look into adding SchemaType as a mixin, it signals to the owning Column > that it should receive events. You can then add the events to your > type itself like before_parent_attach which should fire for the Column. >
OK, I got it working pretty much as desired. Adding def bind_expression(self, bindvalue): return sa.literal_column(self.process_literal_param(bindvalue.value, None), self) makes insert and update statements work. I also got the drop/create business working automatically by copying code from the Postgresql ENUM implementation, though it seems like an excessive amount of boilerplate. To keep the code as-is I had to monkey-patch OracleDialect to add a has_type() method -- any chance you'd want to add that for 1.1? import six import sqlalchemy as sa # Moneky-patch OracleDialect to have has_type() mehtod from sqlalchemy.dialects.oracle.base import OracleDialect def has_type(self, connection, type_name, schema=None): if not schema: schema = self.default_schema_name cursor = connection.execute( sa.sql.text("SELECT type_name FROM all_types " "WHERE type_name = :name AND owner = :schema_name"), name=self.denormalize_name(type_name), schema_name=self.denormalize_name(schema)) return cursor.first() is not None OracleDialect.has_type = has_type class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType): def __init__(self, name, size_limit, item_type, nullable=True, as_tuple=False, inherit_schema=True, create_type=True, **kw): sa.types.UserDefinedType.__init__(self) sa.types.SchemaType.__init__(self, name=name, inherit_schema=inherit_schema, **kw) self.size_limit = size_limit self.item_type = item_type self.nullable = nullable self.as_tuple = as_tuple self.create_type = create_type def get_col_spec(self): return (self.schema + '.' + self.name) if self.schema else self.name def compile(self, dialect=None): return (self.schema + '.' + self.name) if self.schema else self.name def create(self, bind=None, checkfirst=False): if not checkfirst or \ not bind.dialect.has_type( bind, self.name, schema=self.schema): sql = "CREATE TYPE {} AS VARRAY({}) OF {}".format(self.compile(dialect=bind.dialect), self.size_limit, self.item_type.compile(dialect=bind.dialect)) if not self.nullable: sql += " NOT NULL" bind.execute(sql) def drop(self, bind=None, checkfirst=False): if not checkfirst or \ bind.dialect.has_type(bind, self.name, schema=self.schema): bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect)) def _check_for_name_in_memos(self, checkfirst, kw): """Look in the 'ddl runner' for 'memos', then note our name in that collection. This to ensure a particular named enum is operated upon only once within any kind of create/drop sequence without relying upon "checkfirst". """ if not self.create_type: return True if '_ddl_runner' in kw: ddl_runner = kw['_ddl_runner'] if '_oc_varrays' in ddl_runner.memo: pg_enums = ddl_runner.memo['_oc_varrays'] else: pg_enums = ddl_runner.memo['_oc_varrays'] = set() present = self.name in pg_enums pg_enums.add(self.name) return present else: return False def _on_table_create(self, target, bind, checkfirst, **kw): if checkfirst or ( not self.metadata and not kw.get('_is_metadata_operation', False)) and \ not self._check_for_name_in_memos(checkfirst, kw): self.create(bind=bind, checkfirst=checkfirst) def _on_table_drop(self, target, bind, checkfirst, **kw): if not self.metadata and \ not kw.get('_is_metadata_operation', False) and \ not self._check_for_name_in_memos(checkfirst, kw): self.drop(bind=bind, checkfirst=checkfirst) def _on_metadata_create(self, target, bind, checkfirst, **kw): if not self._check_for_name_in_memos(checkfirst, kw): self.create(bind=bind, checkfirst=checkfirst) def _on_metadata_drop(self, target, bind, checkfirst, **kw): if not self._check_for_name_in_memos(checkfirst, kw): self.drop(bind=bind, checkfirst=checkfirst) def process_literal_param(self, value, dialect): return "{}({})".format(self.compile(dialect=dialect), ','.join("NULL" if x is None else ("'%s'" % x) if isinstance(x, six.string_types) else str(x) for x in value)) def literal_processor(self, dialect): def processor(value): return self.process_literal_param(value, dialect) return processor def bind_expression(self, bindvalue): return sa.literal_column(self.process_literal_param(bindvalue.value, None), self) def process_result_value(self, value, dialect): if self.as_tuple: value = tuple(value) return value def result_processor(self, dialect, coltype): def processor(value): return self.process_result_value(value, dialect) return processor def adapt(self, impltype, **kw): return sa.types.SchemaType.adapt(self, impltype, size_limit=self.size_limit, item_type=self.item_type, nullable=self.nullable, as_tuple=self.as_tuple, **kw) if __name__ == '__main__': uri = "oracle://user:password@host" import alchy import sqlalchemy.dialects.oracle as oc db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri, 'SQLALCHEMY_ECHO': True}) class TestVarray(db.Model): __tablename__ = 'test_varray' __table_args__ = { 'schema': 'barra' } idx = sa.Column(sa.Integer, primary_key=True) label = sa.Column(sa.String(20), nullable=False) words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8), nullable=True, schema='barra'), nullable=False) numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, oc.NUMBER(), nullable=True, inherit_schema=True), nullable=False) db.drop_all() db.create_all() db.engine.execute(TestVarray.__table__.insert({'idx': 1, 'label': 'One', 'words': ['Once', 'upon', 'a', 'time'], 'numbers': [1.1, 1.2]}). compile(compile_kwargs={"literal_binds": True})) db.engine.execute(TestVarray.__table__.insert({'idx': 2, 'label': 'Two', 'words': ['To', 'be', 'or', 'not'], 'numbers': [2.1, 2.2]})) db.engine.execute(TestVarray.__table__.update(). where(TestVarray.__table__.c.idx == 1). values(numbers=[1.1111, 1.2222])) print TestVarray.query.all() print db.session().query(TestVarray.label, TestVarray.words, TestVarray.numbers).all() [<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'], numbers=[1.1111, 1.2222])>, <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], numbers=[2.1, 2.2])>] [('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]), ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])] -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.