You're right, of course. Adding has_type() to OracleDialect is more a matter of 
taste than a necessity.
Thanks again for all your help. I'm actually amazed at how well/transparent it 
works, given the cx_Oracle limitations.

From: Mike Bayer <>
Sent: Monday, October 3, 2016 5:54 PM
Subject: Re: [sqlalchemy] Feedback appreciated
To:  <>

On 10/03/2016 05:21 PM, Seth P wrote:
> On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:
>     the bind_expression() hook is here to allow you to re-render the
>     expression.  assuming value-bound bindparam() objects (e.g. not like
>     you'd get with an INSERT or UPDATE usually), the value should be
>     present
>     and you can do this (had to work up a POC):
>     from sqlalchemy import *
>     from sqlalchemy.types import UserDefinedType
>     class T(UserDefinedType):
>          def bind_expression(self, colexpr):
>              return literal_column(colexpr.value)  # or whatever is
>     needed here
>     t = table('t', column('x', T()))
>     print == 'hi')
>     >
>     > Also, is there a way, inside VARRAY.__init__() or some other place
>     that
>     > is called before table creation to specify the
>     sa.event.listen(<table>,
>     > "before_create", self.create_ddl().execute_if(dialect='oracle'))?
>     look into adding SchemaType as a mixin, it signals to the owning Column
>     that it should receive events.   You can then add the events to your
>     type itself like before_parent_attach which should fire for the Column.
> OK, I got it working pretty much as desired.
> Adding
> def bind_expression(self, bindvalue):
>     return sa.literal_column(self.process_literal_param(bindvalue.value, 
> None), self)
> makes insert and update statements work.
> I also got the drop/create business working automatically by copying
> code from the Postgresql ENUM implementation, though it seems like an
> excessive amount of boilerplate. To keep the code as-is I had to
> monkey-patch OracleDialect to add a has_type() method -- any chance
> you'd want to add that for 1.1?
> import six
> import sqlalchemy as sa
> # Moneky-patch OracleDialect to have has_type() mehtod
> from import OracleDialect
> def has_type(self, connection, type_name, schema=None):
>     if not schema:
>         schema = self.default_schema_name
>     cursor = connection.execute(
>         sa.sql.text("SELECT type_name FROM all_types "
> "WHERE type_name = :name AND owner = :schema_name"),
>         name=self.denormalize_name(type_name),
>         schema_name=self.denormalize_name(schema))
>     return cursor.first() is not None
> OracleDialect.has_type = has_type

I can see that a full implementation in SQLA would benefit from an 
OracleDialect.has_type() method but I don't see why, in the case here of 
external implementation, why has_type() has to be on the OracleDialect? 
You've got the bind right there and you're calling other SQL on it 
inline within your type.

> class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):
>     def __init__(self, name, size_limit, item_type, nullable=True, 
> as_tuple=False,
>                  inherit_schema=True, create_type=True, **kw):
>         sa.types.UserDefinedType.__init__(self)
>         sa.types.SchemaType.__init__(self, name=name, 
> inherit_schema=inherit_schema, **kw)
>         self.size_limit = size_limit
>         self.item_type = item_type
>         self.nullable = nullable
>         self.as_tuple = as_tuple
>         self.create_type = create_type
>     def get_col_spec(self):
>         return (self.schema + '.' + if self.schema else
>     def compile(self, dialect=None):
>         return (self.schema + '.' + if self.schema else
>     def create(self, bind=None, checkfirst=False):
>         if not checkfirst or \
>                 not bind.dialect.has_type(
>                     bind,, schema=self.schema):
>             sql = "CREATE TYPE {} AS VARRAY({}) OF 
> {}".format(self.compile(dialect=bind.dialect),
>                                                               self.size_limit,
> self.item_type.compile(dialect=bind.dialect))
>             if not self.nullable:
>                 sql += " NOT NULL"
> bind.execute(sql)
>     def drop(self, bind=None, checkfirst=False):
>         if not checkfirst or \
>                 bind.dialect.has_type(bind,, schema=self.schema):
>             bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))
>     def _check_for_name_in_memos(self, checkfirst, kw):
>         """Look in the 'ddl runner' for 'memos', then
> note our name in that collection.
> This to ensure a particular named enum is operated
> upon only once within any kind of create/drop
> sequence without relying upon "checkfirst".
> """
> if not self.create_type:
>             return True
> if '_ddl_runner' in kw:
>             ddl_runner = kw['_ddl_runner']
>             if '_oc_varrays' in ddl_runner.memo:
>                 pg_enums = ddl_runner.memo['_oc_varrays']
>             else:
>                 pg_enums = ddl_runner.memo['_oc_varrays'] = set()
>             present = in pg_enums
>             pg_enums.add(
>             return present
>         else:
>             return False
> def _on_table_create(self, target, bind, checkfirst, **kw):
>         if checkfirst or (
>                 not self.metadata and
> not kw.get('_is_metadata_operation', False)) and \
>                 not self._check_for_name_in_memos(checkfirst, kw):
>             self.create(bind=bind, checkfirst=checkfirst)
>     def _on_table_drop(self, target, bind, checkfirst, **kw):
>         if not self.metadata and \
>             not kw.get('_is_metadata_operation', False) and \
>                 not self._check_for_name_in_memos(checkfirst, kw):
>             self.drop(bind=bind, checkfirst=checkfirst)
>     def _on_metadata_create(self, target, bind, checkfirst, **kw):
>         if not self._check_for_name_in_memos(checkfirst, kw):
>             self.create(bind=bind, checkfirst=checkfirst)
>     def _on_metadata_drop(self, target, bind, checkfirst, **kw):
>         if not self._check_for_name_in_memos(checkfirst, kw):
>             self.drop(bind=bind, checkfirst=checkfirst)
>     def process_literal_param(self, value, dialect):
>         return "{}({})".format(self.compile(dialect=dialect),
>                                ','.join("NULL" if x is None else
> ("'%s'" % x) if isinstance(x, six.string_types) else str(x)
>                                         for x in value))
>     def literal_processor(self, dialect):
>         def processor(value):
>             return self.process_literal_param(value, dialect)
>         return processor
>     def bind_expression(self, bindvalue):
>         return sa.literal_column(self.process_literal_param(bindvalue.value, 
> None), self)
>     def process_result_value(self, value, dialect):
>         if self.as_tuple:
>             value = tuple(value)
>         return value
>     def result_processor(self, dialect, coltype):
>         def processor(value):
>             return self.process_result_value(value, dialect)
>         return processor
>     def adapt(self, impltype, **kw):
>         return sa.types.SchemaType.adapt(self, impltype,
>                                          size_limit=self.size_limit,
>                                          item_type=self.item_type,
>                                          nullable=self.nullable,
>                                          as_tuple=self.as_tuple,
>                                          **kw)
> if __name__ == '__main__':
>     uri = "oracle://user:password@host"
> import alchy
>     import as oc
>     db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri, 
>     class TestVarray(db.Model):
>         __tablename__ = 'test_varray'
> __table_args__ = { 'schema': 'barra' }
>         idx = sa.Column(sa.Integer, primary_key=True)
>         label = sa.Column(sa.String(20), nullable=False)
>         words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8), 
> nullable=True, schema='barra'), nullable=False)
>         numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, 
> oc.NUMBER(), nullable=True, inherit_schema=True), nullable=False)
>     db.drop_all()
>     db.create_all()
>     db.engine.execute(TestVarray.__table__.insert({'idx': 1,
>                                                    'label': 'One',
>                                                    'words': ['Once', 'upon', 
> 'a', 'time'],
>                                                    'numbers': [1.1, 1.2]}).
>                       compile(compile_kwargs={"literal_binds": True}))
>     db.engine.execute(TestVarray.__table__.insert({'idx': 2,
>                                                    'label': 'Two',
>                                                    'words': ['To', 'be', 
> 'or', 'not'],
>                                                    'numbers': [2.1, 2.2]}))
>     db.engine.execute(TestVarray.__table__.update().
>                       where(TestVarray.__table__.c.idx == 1).
>                       values(numbers=[1.1111, 1.2222]))
>     print TestVarray.query.all()
>     print db.session().query(TestVarray.label, TestVarray.words, 
> TestVarray.numbers).all()
> [<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'], 
> numbers=[1.1111, 1.2222])>,
>  <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], 
> numbers=[2.1, 2.2])>]
> [('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
>  ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]
> --
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to
> <>.
> To post to this group, send email to
> <>.
> Visit this group at
> For more options, visit

You received this message because you are subscribed to a topic in the Google 
Groups "sqlalchemy" group.
To unsubscribe from this topic, visit
To unsubscribe from this group and all its topics, send an email to
To post to this group, send email to
Visit this group at
For more options, visit


You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
To post to this group, send email to
Visit this group at
For more options, visit

Reply via email to