On 10/03/2016 05:21 PM, Seth P wrote:


On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:

    the bind_expression() hook is here to allow you to re-render the
    expression.  assuming value-bound bindparam() objects (e.g. not like
    you'd get with an INSERT or UPDATE usually), the value should be
    present
    and you can do this (had to work up a POC):

    from sqlalchemy import *
    from sqlalchemy.types import UserDefinedType


    class T(UserDefinedType):

         def bind_expression(self, colexpr):
             return literal_column(colexpr.value)  # or whatever is
    needed here

    t = table('t', column('x', T()))

    print t.select().where(t.c.x == 'hi')




    >
    > Also, is there a way, inside VARRAY.__init__() or some other place
    that
    > is called before table creation to specify the
    sa.event.listen(<table>,
    > "before_create", self.create_ddl().execute_if(dialect='oracle'))?


    look into adding SchemaType as a mixin, it signals to the owning Column
    that it should receive events.   You can then add the events to your
    type itself like before_parent_attach which should fire for the Column.



OK, I got it working pretty much as desired.

Adding

def bind_expression(self, bindvalue):
    return sa.literal_column(self.process_literal_param(bindvalue.value, None), 
self)

makes insert and update statements work.

I also got the drop/create business working automatically by copying
code from the Postgresql ENUM implementation, though it seems like an
excessive amount of boilerplate. To keep the code as-is I had to
monkey-patch OracleDialect to add a has_type() method -- any chance
you'd want to add that for 1.1?

import six
import sqlalchemy as sa


# Moneky-patch OracleDialect to have has_type() mehtod
from sqlalchemy.dialects.oracle.base import OracleDialect

def has_type(self, connection, type_name, schema=None):
    if not schema:
        schema = self.default_schema_name
    cursor = connection.execute(
        sa.sql.text("SELECT type_name FROM all_types "
"WHERE type_name = :name AND owner = :schema_name"),
        name=self.denormalize_name(type_name),
        schema_name=self.denormalize_name(schema))
    return cursor.first() is not None

OracleDialect.has_type = has_type



I can see that a full implementation in SQLA would benefit from an OracleDialect.has_type() method but I don't see why, in the case here of external implementation, why has_type() has to be on the OracleDialect? You've got the bind right there and you're calling other SQL on it inline within your type.



class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):

    def __init__(self, name, size_limit, item_type, nullable=True, 
as_tuple=False,
                 inherit_schema=True, create_type=True, **kw):
        sa.types.UserDefinedType.__init__(self)
        sa.types.SchemaType.__init__(self, name=name, 
inherit_schema=inherit_schema, **kw)
        self.size_limit = size_limit
        self.item_type = item_type
        self.nullable = nullable
        self.as_tuple = as_tuple
        self.create_type = create_type

    def get_col_spec(self):
        return (self.schema + '.' + self.name) if self.schema else self.name

    def compile(self, dialect=None):
        return (self.schema + '.' + self.name) if self.schema else self.name

    def create(self, bind=None, checkfirst=False):
        if not checkfirst or \
                not bind.dialect.has_type(
                    bind, self.name, schema=self.schema):
            sql = "CREATE TYPE {} AS VARRAY({}) OF 
{}".format(self.compile(dialect=bind.dialect),
                                                              self.size_limit,
                                                              
self.item_type.compile(dialect=bind.dialect))
            if not self.nullable:
                sql += " NOT NULL"
bind.execute(sql)

    def drop(self, bind=None, checkfirst=False):
        if not checkfirst or \
                bind.dialect.has_type(bind, self.name, schema=self.schema):
            bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))

    def _check_for_name_in_memos(self, checkfirst, kw):
        """Look in the 'ddl runner' for 'memos', then
note our name in that collection.

This to ensure a particular named enum is operated
upon only once within any kind of create/drop
sequence without relying upon "checkfirst".
"""
if not self.create_type:
            return True
if '_ddl_runner' in kw:
            ddl_runner = kw['_ddl_runner']
            if '_oc_varrays' in ddl_runner.memo:
                pg_enums = ddl_runner.memo['_oc_varrays']
            else:
                pg_enums = ddl_runner.memo['_oc_varrays'] = set()
            present = self.name in pg_enums
            pg_enums.add(self.name)
            return present
        else:
            return False

def _on_table_create(self, target, bind, checkfirst, **kw):
        if checkfirst or (
                not self.metadata and
not kw.get('_is_metadata_operation', False)) and \
                not self._check_for_name_in_memos(checkfirst, kw):
            self.create(bind=bind, checkfirst=checkfirst)

    def _on_table_drop(self, target, bind, checkfirst, **kw):
        if not self.metadata and \
            not kw.get('_is_metadata_operation', False) and \
                not self._check_for_name_in_memos(checkfirst, kw):
            self.drop(bind=bind, checkfirst=checkfirst)

    def _on_metadata_create(self, target, bind, checkfirst, **kw):
        if not self._check_for_name_in_memos(checkfirst, kw):
            self.create(bind=bind, checkfirst=checkfirst)

    def _on_metadata_drop(self, target, bind, checkfirst, **kw):
        if not self._check_for_name_in_memos(checkfirst, kw):
            self.drop(bind=bind, checkfirst=checkfirst)

    def process_literal_param(self, value, dialect):
        return "{}({})".format(self.compile(dialect=dialect),
                               ','.join("NULL" if x is None else
("'%s'" % x) if isinstance(x, six.string_types) else str(x)
                                        for x in value))

    def literal_processor(self, dialect):
        def processor(value):
            return self.process_literal_param(value, dialect)
        return processor

    def bind_expression(self, bindvalue):
        return sa.literal_column(self.process_literal_param(bindvalue.value, 
None), self)

    def process_result_value(self, value, dialect):
        if self.as_tuple:
            value = tuple(value)
        return value

    def result_processor(self, dialect, coltype):
        def processor(value):
            return self.process_result_value(value, dialect)
        return processor

    def adapt(self, impltype, **kw):
        return sa.types.SchemaType.adapt(self, impltype,
                                         size_limit=self.size_limit,
                                         item_type=self.item_type,
                                         nullable=self.nullable,
                                         as_tuple=self.as_tuple,
                                         **kw)


if __name__ == '__main__':

    uri = "oracle://user:password@host"

import alchy
    import sqlalchemy.dialects.oracle as oc

    db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri, 
'SQLALCHEMY_ECHO': True})

    class TestVarray(db.Model):
        __tablename__ = 'test_varray'
__table_args__ = { 'schema': 'barra' }
        idx = sa.Column(sa.Integer, primary_key=True)
        label = sa.Column(sa.String(20), nullable=False)
        words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8), 
nullable=True, schema='barra'), nullable=False)
        numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, oc.NUMBER(), 
nullable=True, inherit_schema=True), nullable=False)

    db.drop_all()
    db.create_all()

    db.engine.execute(TestVarray.__table__.insert({'idx': 1,
                                                   'label': 'One',
                                                   'words': ['Once', 'upon', 
'a', 'time'],
                                                   'numbers': [1.1, 1.2]}).
                      compile(compile_kwargs={"literal_binds": True}))
    db.engine.execute(TestVarray.__table__.insert({'idx': 2,
                                                   'label': 'Two',
                                                   'words': ['To', 'be', 'or', 
'not'],
                                                   'numbers': [2.1, 2.2]}))
    db.engine.execute(TestVarray.__table__.update().
                      where(TestVarray.__table__.c.idx == 1).
                      values(numbers=[1.1111, 1.2222]))

    print TestVarray.query.all()
    print db.session().query(TestVarray.label, TestVarray.words, 
TestVarray.numbers).all()

[<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'], 
numbers=[1.1111, 1.2222])>,
 <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], numbers=[2.1, 
2.2])>]
[('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
 ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]




--
You received this message because you are subscribed to the Google
Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to sqlalchemy+unsubscr...@googlegroups.com
<mailto:sqlalchemy+unsubscr...@googlegroups.com>.
To post to this group, send email to sqlalchemy@googlegroups.com
<mailto:sqlalchemy@googlegroups.com>.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to