You're right, of course. Adding has_type() to OracleDialect is more a matter of 
taste than a necessity.
Thanks again for all your help. I'm actually amazed at how well/transparent it 
works, given the cx_Oracle limitations.



                _____________________________
From: Mike Bayer <mike...@zzzcomputing.com>
Sent: Monday, October 3, 2016 5:54 PM
Subject: Re: [sqlalchemy] Feedback appreciated
To:  <sqlalchemy@googlegroups.com>




On 10/03/2016 05:21 PM, Seth P wrote:
>
>
> On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:
>
>     the bind_expression() hook is here to allow you to re-render the
>     expression.  assuming value-bound bindparam() objects (e.g. not like
>     you'd get with an INSERT or UPDATE usually), the value should be
>     present
>     and you can do this (had to work up a POC):
>
>     from sqlalchemy import *
>     from sqlalchemy.types import UserDefinedType
>
>
>     class T(UserDefinedType):
>
>          def bind_expression(self, colexpr):
>              return literal_column(colexpr.value)  # or whatever is
>     needed here
>
>     t = table('t', column('x', T()))
>
>     print t.select().where(t.c.x == 'hi')
>
>
>
>
>     >
>     > Also, is there a way, inside VARRAY.__init__() or some other place
>     that
>     > is called before table creation to specify the
>     sa.event.listen(<table>,
>     > "before_create", self.create_ddl().execute_if(dialect='oracle'))?
>
>
>     look into adding SchemaType as a mixin, it signals to the owning Column
>     that it should receive events.   You can then add the events to your
>     type itself like before_parent_attach which should fire for the Column.
>
>
>
> OK, I got it working pretty much as desired.
>
> Adding
>
> def bind_expression(self, bindvalue):
>     return sa.literal_column(self.process_literal_param(bindvalue.value, 
> None), self)
>
> makes insert and update statements work.
>
> I also got the drop/create business working automatically by copying
> code from the Postgresql ENUM implementation, though it seems like an
> excessive amount of boilerplate. To keep the code as-is I had to
> monkey-patch OracleDialect to add a has_type() method -- any chance
> you'd want to add that for 1.1?
>
> import six
> import sqlalchemy as sa
>
>
> # Moneky-patch OracleDialect to have has_type() mehtod
> from sqlalchemy.dialects.oracle.base import OracleDialect
>
> def has_type(self, connection, type_name, schema=None):
>     if not schema:
>         schema = self.default_schema_name
>     cursor = connection.execute(
>         sa.sql.text("SELECT type_name FROM all_types "
> "WHERE type_name = :name AND owner = :schema_name"),
>         name=self.denormalize_name(type_name),
>         schema_name=self.denormalize_name(schema))
>     return cursor.first() is not None
>
> OracleDialect.has_type = has_type
>


I can see that a full implementation in SQLA would benefit from an 
OracleDialect.has_type() method but I don't see why, in the case here of 
external implementation, why has_type() has to be on the OracleDialect? 
You've got the bind right there and you're calling other SQL on it 
inline within your type.


>
> class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):
>
>     def __init__(self, name, size_limit, item_type, nullable=True, 
> as_tuple=False,
>                  inherit_schema=True, create_type=True, **kw):
>         sa.types.UserDefinedType.__init__(self)
>         sa.types.SchemaType.__init__(self, name=name, 
> inherit_schema=inherit_schema, **kw)
>         self.size_limit = size_limit
>         self.item_type = item_type
>         self.nullable = nullable
>         self.as_tuple = as_tuple
>         self.create_type = create_type
>
>     def get_col_spec(self):
>         return (self.schema + '.' + self.name) if self.schema else self.name
>
>     def compile(self, dialect=None):
>         return (self.schema + '.' + self.name) if self.schema else self.name
>
>     def create(self, bind=None, checkfirst=False):
>         if not checkfirst or \
>                 not bind.dialect.has_type(
>                     bind, self.name, schema=self.schema):
>             sql = "CREATE TYPE {} AS VARRAY({}) OF 
> {}".format(self.compile(dialect=bind.dialect),
>                                                               self.size_limit,
>                                                               
> self.item_type.compile(dialect=bind.dialect))
>             if not self.nullable:
>                 sql += " NOT NULL"
> bind.execute(sql)
>
>     def drop(self, bind=None, checkfirst=False):
>         if not checkfirst or \
>                 bind.dialect.has_type(bind, self.name, schema=self.schema):
>             bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))
>
>     def _check_for_name_in_memos(self, checkfirst, kw):
>         """Look in the 'ddl runner' for 'memos', then
> note our name in that collection.
>
> This to ensure a particular named enum is operated
> upon only once within any kind of create/drop
> sequence without relying upon "checkfirst".
> """
> if not self.create_type:
>             return True
> if '_ddl_runner' in kw:
>             ddl_runner = kw['_ddl_runner']
>             if '_oc_varrays' in ddl_runner.memo:
>                 pg_enums = ddl_runner.memo['_oc_varrays']
>             else:
>                 pg_enums = ddl_runner.memo['_oc_varrays'] = set()
>             present = self.name in pg_enums
>             pg_enums.add(self.name)
>             return present
>         else:
>             return False
>
> def _on_table_create(self, target, bind, checkfirst, **kw):
>         if checkfirst or (
>                 not self.metadata and
> not kw.get('_is_metadata_operation', False)) and \
>                 not self._check_for_name_in_memos(checkfirst, kw):
>             self.create(bind=bind, checkfirst=checkfirst)
>
>     def _on_table_drop(self, target, bind, checkfirst, **kw):
>         if not self.metadata and \
>             not kw.get('_is_metadata_operation', False) and \
>                 not self._check_for_name_in_memos(checkfirst, kw):
>             self.drop(bind=bind, checkfirst=checkfirst)
>
>     def _on_metadata_create(self, target, bind, checkfirst, **kw):
>         if not self._check_for_name_in_memos(checkfirst, kw):
>             self.create(bind=bind, checkfirst=checkfirst)
>
>     def _on_metadata_drop(self, target, bind, checkfirst, **kw):
>         if not self._check_for_name_in_memos(checkfirst, kw):
>             self.drop(bind=bind, checkfirst=checkfirst)
>
>     def process_literal_param(self, value, dialect):
>         return "{}({})".format(self.compile(dialect=dialect),
>                                ','.join("NULL" if x is None else
> ("'%s'" % x) if isinstance(x, six.string_types) else str(x)
>                                         for x in value))
>
>     def literal_processor(self, dialect):
>         def processor(value):
>             return self.process_literal_param(value, dialect)
>         return processor
>
>     def bind_expression(self, bindvalue):
>         return sa.literal_column(self.process_literal_param(bindvalue.value, 
> None), self)
>
>     def process_result_value(self, value, dialect):
>         if self.as_tuple:
>             value = tuple(value)
>         return value
>
>     def result_processor(self, dialect, coltype):
>         def processor(value):
>             return self.process_result_value(value, dialect)
>         return processor
>
>     def adapt(self, impltype, **kw):
>         return sa.types.SchemaType.adapt(self, impltype,
>                                          size_limit=self.size_limit,
>                                          item_type=self.item_type,
>                                          nullable=self.nullable,
>                                          as_tuple=self.as_tuple,
>                                          **kw)
>
>
> if __name__ == '__main__':
>
>     uri = "oracle://user:password@host"
>
> import alchy
>     import sqlalchemy.dialects.oracle as oc
>
>     db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri, 
> 'SQLALCHEMY_ECHO': True})
>
>     class TestVarray(db.Model):
>         __tablename__ = 'test_varray'
> __table_args__ = { 'schema': 'barra' }
>         idx = sa.Column(sa.Integer, primary_key=True)
>         label = sa.Column(sa.String(20), nullable=False)
>         words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8), 
> nullable=True, schema='barra'), nullable=False)
>         numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, 
> oc.NUMBER(), nullable=True, inherit_schema=True), nullable=False)
>
>     db.drop_all()
>     db.create_all()
>
>     db.engine.execute(TestVarray.__table__.insert({'idx': 1,
>                                                    'label': 'One',
>                                                    'words': ['Once', 'upon', 
> 'a', 'time'],
>                                                    'numbers': [1.1, 1.2]}).
>                       compile(compile_kwargs={"literal_binds": True}))
>     db.engine.execute(TestVarray.__table__.insert({'idx': 2,
>                                                    'label': 'Two',
>                                                    'words': ['To', 'be', 
> 'or', 'not'],
>                                                    'numbers': [2.1, 2.2]}))
>     db.engine.execute(TestVarray.__table__.update().
>                       where(TestVarray.__table__.c.idx == 1).
>                       values(numbers=[1.1111, 1.2222]))
>
>     print TestVarray.query.all()
>     print db.session().query(TestVarray.label, TestVarray.words, 
> TestVarray.numbers).all()
>
> [<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'], 
> numbers=[1.1111, 1.2222])>,
>  <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], 
> numbers=[2.1, 2.2])>]
> [('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
>  ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]
>
>
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+unsubscr...@googlegroups.com
> <mailto:sqlalchemy+unsubscr...@googlegroups.com>.
> To post to this group, send email to sqlalchemy@googlegroups.com
> <mailto:sqlalchemy@googlegroups.com>.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to a topic in the Google 
Groups "sqlalchemy" group.
To unsubscribe from this topic, visit 
https://groups.google.com/d/topic/sqlalchemy/BGWLUjaFtpY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to 
sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.



        

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to