On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:
the bind_expression() hook is here to allow you to re-render the
expression. assuming value-bound bindparam() objects (e.g. not like
you'd get with an INSERT or UPDATE usually), the value should be
present
and you can do this (had to work up a POC):
from sqlalchemy import *
from sqlalchemy.types import UserDefinedType
class T(UserDefinedType):
def bind_expression(self, colexpr):
return literal_column(colexpr.value) # or whatever is
needed here
t = table('t', column('x', T()))
print t.select().where(t.c.x == 'hi')
>
> Also, is there a way, inside VARRAY.__init__() or some other place
that
> is called before table creation to specify the
sa.event.listen(<table>,
> "before_create", self.create_ddl().execute_if(dialect='oracle'))?
look into adding SchemaType as a mixin, it signals to the owning Column
that it should receive events. You can then add the events to your
type itself like before_parent_attach which should fire for the Column.
OK, I got it working pretty much as desired.
Adding
def bind_expression(self, bindvalue):
return sa.literal_column(self.process_literal_param(bindvalue.value, None),
self)
makes insert and update statements work.
I also got the drop/create business working automatically by copying
code from the Postgresql ENUM implementation, though it seems like an
excessive amount of boilerplate. To keep the code as-is I had to
monkey-patch OracleDialect to add a has_type() method -- any chance
you'd want to add that for 1.1?
import six
import sqlalchemy as sa
# Moneky-patch OracleDialect to have has_type() mehtod
from sqlalchemy.dialects.oracle.base import OracleDialect
def has_type(self, connection, type_name, schema=None):
if not schema:
schema = self.default_schema_name
cursor = connection.execute(
sa.sql.text("SELECT type_name FROM all_types "
"WHERE type_name = :name AND owner = :schema_name"),
name=self.denormalize_name(type_name),
schema_name=self.denormalize_name(schema))
return cursor.first() is not None
OracleDialect.has_type = has_type
class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):
def __init__(self, name, size_limit, item_type, nullable=True,
as_tuple=False,
inherit_schema=True, create_type=True, **kw):
sa.types.UserDefinedType.__init__(self)
sa.types.SchemaType.__init__(self, name=name,
inherit_schema=inherit_schema, **kw)
self.size_limit = size_limit
self.item_type = item_type
self.nullable = nullable
self.as_tuple = as_tuple
self.create_type = create_type
def get_col_spec(self):
return (self.schema + '.' + self.name) if self.schema else self.name
def compile(self, dialect=None):
return (self.schema + '.' + self.name) if self.schema else self.name
def create(self, bind=None, checkfirst=False):
if not checkfirst or \
not bind.dialect.has_type(
bind, self.name, schema=self.schema):
sql = "CREATE TYPE {} AS VARRAY({}) OF
{}".format(self.compile(dialect=bind.dialect),
self.size_limit,
self.item_type.compile(dialect=bind.dialect))
if not self.nullable:
sql += " NOT NULL"
bind.execute(sql)
def drop(self, bind=None, checkfirst=False):
if not checkfirst or \
bind.dialect.has_type(bind, self.name, schema=self.schema):
bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))
def _check_for_name_in_memos(self, checkfirst, kw):
"""Look in the 'ddl runner' for 'memos', then
note our name in that collection.
This to ensure a particular named enum is operated
upon only once within any kind of create/drop
sequence without relying upon "checkfirst".
"""
if not self.create_type:
return True
if '_ddl_runner' in kw:
ddl_runner = kw['_ddl_runner']
if '_oc_varrays' in ddl_runner.memo:
pg_enums = ddl_runner.memo['_oc_varrays']
else:
pg_enums = ddl_runner.memo['_oc_varrays'] = set()
present = self.name in pg_enums
pg_enums.add(self.name)
return present
else:
return False
def _on_table_create(self, target, bind, checkfirst, **kw):
if checkfirst or (
not self.metadata and
not kw.get('_is_metadata_operation', False)) and \
not self._check_for_name_in_memos(checkfirst, kw):
self.create(bind=bind, checkfirst=checkfirst)
def _on_table_drop(self, target, bind, checkfirst, **kw):
if not self.metadata and \
not kw.get('_is_metadata_operation', False) and \
not self._check_for_name_in_memos(checkfirst, kw):
self.drop(bind=bind, checkfirst=checkfirst)
def _on_metadata_create(self, target, bind, checkfirst, **kw):
if not self._check_for_name_in_memos(checkfirst, kw):
self.create(bind=bind, checkfirst=checkfirst)
def _on_metadata_drop(self, target, bind, checkfirst, **kw):
if not self._check_for_name_in_memos(checkfirst, kw):
self.drop(bind=bind, checkfirst=checkfirst)
def process_literal_param(self, value, dialect):
return "{}({})".format(self.compile(dialect=dialect),
','.join("NULL" if x is None else
("'%s'" % x) if isinstance(x, six.string_types) else str(x)
for x in value))
def literal_processor(self, dialect):
def processor(value):
return self.process_literal_param(value, dialect)
return processor
def bind_expression(self, bindvalue):
return sa.literal_column(self.process_literal_param(bindvalue.value,
None), self)
def process_result_value(self, value, dialect):
if self.as_tuple:
value = tuple(value)
return value
def result_processor(self, dialect, coltype):
def processor(value):
return self.process_result_value(value, dialect)
return processor
def adapt(self, impltype, **kw):
return sa.types.SchemaType.adapt(self, impltype,
size_limit=self.size_limit,
item_type=self.item_type,
nullable=self.nullable,
as_tuple=self.as_tuple,
**kw)
if __name__ == '__main__':
uri = "oracle://user:password@host"
import alchy
import sqlalchemy.dialects.oracle as oc
db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri,
'SQLALCHEMY_ECHO': True})
class TestVarray(db.Model):
__tablename__ = 'test_varray'
__table_args__ = { 'schema': 'barra' }
idx = sa.Column(sa.Integer, primary_key=True)
label = sa.Column(sa.String(20), nullable=False)
words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8),
nullable=True, schema='barra'), nullable=False)
numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000, oc.NUMBER(),
nullable=True, inherit_schema=True), nullable=False)
db.drop_all()
db.create_all()
db.engine.execute(TestVarray.__table__.insert({'idx': 1,
'label': 'One',
'words': ['Once', 'upon',
'a', 'time'],
'numbers': [1.1, 1.2]}).
compile(compile_kwargs={"literal_binds": True}))
db.engine.execute(TestVarray.__table__.insert({'idx': 2,
'label': 'Two',
'words': ['To', 'be', 'or',
'not'],
'numbers': [2.1, 2.2]}))
db.engine.execute(TestVarray.__table__.update().
where(TestVarray.__table__.c.idx == 1).
values(numbers=[1.1111, 1.2222]))
print TestVarray.query.all()
print db.session().query(TestVarray.label, TestVarray.words,
TestVarray.numbers).all()
[<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'],
numbers=[1.1111, 1.2222])>,
<TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'], numbers=[2.1,
2.2])>]
[('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]
--
You received this message because you are subscribed to the Google
Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to sqlalchemy+unsubscr...@googlegroups.com
<mailto:sqlalchemy+unsubscr...@googlegroups.com>.
To post to this group, send email to sqlalchemy@googlegroups.com
<mailto:sqlalchemy@googlegroups.com>.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.