Dear all,
I've been trying to implement a datetime-style field in SQL Alchemy that, in the database backend, uses a specific ISO-8601 format (e.g. " 2013-04-30T00:26:03.000+05:00"), and is a datetime at the Python end. The primary database engine is MySQL. I don't want to implement this as a hybrid property, because I want a lot of these fields and to be able to use them automatically. I've implemented a TypeDecorator with a custom comparator (code below). Let's call the field type DateTimeAsIsoText. The field works fine for the following operations: - save from Python to database - load from database to Python - compare database column to literal, either as (DateTimeAsIsoText op LITERAL), or (LITERAL op DateTimeAsIsoText) - compare DateTimeAsIsoText column to another column of the same type - compare DateTimeAsIsoText to DATETIME - ... but *not* DATETIME to DateTimeAsIsoText. I'm stuck because the comparator's operate() function isn't called in this situation, it seems, and neither is its reverse_operate() function. I'm using SQLAlchemy 1.0.11 (and MySQL 5.6.28, though I suspect that isn't relevant). Sample code is below. *Results: *All comparisons work, except the last set of comparisons, of a plain DATETIME field to a DateTimeAsIsoText field *(in that order*; comparison section "E" in the code below). Am I missing something obvious, or is it not possible to override SQL comparisons when the SQLAlchemy field is on the right-hand side of a comparison? Thank you! all the best, Rudolf. *Code:* #!/usr/bin/env python3 import datetime import dateutil.parser import getpass import logging import pytz import sqlalchemy from sqlalchemy import ( Column, DateTime, Integer, String, TypeDecorator ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.sql.expression import func logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) Base = declarative_base() # ============================================================================= # Ancillary functions # ============================================================================= def heading(x): print("=" * 79) print(x) print("=" * 79) def ask_user(prompt, default=None, returntype=None, mask=False): if default is not None: fullprompt = "{} [{}]: ".format(prompt, default) else: fullprompt = "{}: ".format(prompt) success = False while not success: if mask: value = getpass.getpass(fullprompt) or default else: value = input(fullprompt) or default if returntype is not None: try: value = returntype(value) success = True except: print("Bad value, try again") else: success = True return value def engine_mysql(user, password, host, port, database, echo=True, interface="pymysql"): CONNECTSTRING = ( "mysql+{interface}://{user}:{password}@{host}:{port}/" "{database}".format( interface=interface, user=user, password=password, host=host, port=port, database=database )) # Removed "?charset=utf8&use_unicode=0"; with PyMySQL==0.7.1 it causes # TypeError: 'str' does not support the buffer interface # because dates come back as e.g. b'2013-05-30 06:00:00' and then the # convert_datetime function in pymysql/converters.py chokes. return sqlalchemy.create_engine(CONNECTSTRING, echo=echo) def engine_mysql_commandline(echo=True): host = ask_user("Host", "localhost") port = 3306 database = ask_user("Database", "testdb") user = ask_user("User", "root") password = ask_user("Password", mask=True) return engine_mysql(user, password, host, port, database, echo=echo) # ============================================================================= # Custom date/time field as ISO-8601 text including timezone # ============================================================================= def python_datetime_to_iso(x): """From a Python datetime to an ISO-formatted string in our particular format.""" # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa try: mainpart = x.strftime("%Y-%m-%dT%H:%M:%S.%f") # microsecond accuracy timezone = x.strftime("%z") # won't have the colon in return mainpart + timezone[:-2] + ":" + timezone[-2:] except AttributeError: return None def iso_to_python_datetime(x): """From an ISO-formatted string to a Python datetime, with timezone.""" try: return dateutil.parser.parse(x) except (AttributeError, ValueError): return None def python_datetime_to_utc(x): """From a Python datetime, with timezone, to a UTC Python version.""" try: return x.astimezone(pytz.utc) except AttributeError: return None def mysql_isotzdatetime_to_utcdatetime(x): """Creates an SQL expression wrapping a field containing our ISO-8601 text, making a DATETIME out of it, in the UTC timezone.""" # For format, see # https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format # noqa # Note the use of "%i" for minutes. # Things after "func." get passed to the database engine as literal SQL # functions; http://docs.sqlalchemy.org/en/latest/core/tutorial.html return func.CONVERT_TZ( func.STR_TO_DATE( func.LEFT(x, func.LENGTH(x) - 6), '%Y-%m-%dT%H:%i:%S.%f' ), func.RIGHT(x, 6), "+00:00" ) def mysql_unknown_field_to_utcdatetime(x): """The field might be a DATETIME, or an ISO-formatted field.""" return func.IF( func.LENGTH(x) == 19, # ... length of a plain DATETIME e.g. 2013-05-30 00:00:00 x, mysql_isotzdatetime_to_utcdatetime(x) ) class DateTimeAsIsoText(TypeDecorator): '''Stores date/time values as ISO-8601.''' impl = sqlalchemy.types.String(32) # underlying SQL type def process_bind_param(self, value, dialect): """Convert things on the way from Python to the database.""" logger.debug( "process_bind_param(self={}, value={}, dialect={})".format( repr(self), repr(value), repr(dialect))) return python_datetime_to_iso(value) def process_literal_param(self, value, dialect): """Convert things on the way from Python to the database.""" logger.debug( "process_literal_param(self={}, value={}, dialect={})".format( repr(self), repr(value), repr(dialect))) return python_datetime_to_iso(value) def process_result_value(self, value, dialect): """Convert things on the way from the database to Python.""" logger.debug( "process_result_value(self={}, value={}, dialect={})".format( repr(self), repr(value), repr(dialect))) return iso_to_python_datetime(value) class comparator_factory(TypeDecorator.Comparator): """Process SQL for when we are comparing our column, in the database, to something else.""" def operate(self, op, other): if isinstance(other, datetime.datetime): processed_other = python_datetime_to_utc(other) else: # OK. At this point, "other" could be a plain DATETIME field, # or a DateTimeAsIsoText field (or potentially something # else that we don't really care about). If it's a DATETIME, # then we assume it is already in UTC. processed_other = mysql_unknown_field_to_utcdatetime(other) logger.debug("operate(self={}, op={}, other={})".format( repr(self), repr(op), repr(other))) logger.debug("self.expr = {}".format(repr(self.expr))) # traceback.print_stack() return op(mysql_isotzdatetime_to_utcdatetime(self.expr), processed_other) # NOT YET IMPLEMENTED: dialects other than MySQL, and how to # detect the dialect at this point. def reverse_operate(self, op, other): assert False, "I don't think this is ever being called" class TestIso(Base): __tablename__ = 'test_iso_datetime' id = Column(Integer, primary_key=True) name = Column(String(20)) plain_datetime = Column(DateTime) when_created = Column(DateTimeAsIsoText) when_deleted = Column(DateTimeAsIsoText) def __repr__(self): return ( "<TestIso(id={}, name={}, " "plain_datetime={}, when_created={}, when_deleted={})>".format( self.id, self.name, repr(self.plain_datetime), repr(self.when_created), repr(self.when_deleted), self.q1) ) # ============================================================================= # Main, with unit testing # ============================================================================= def test(): logging.basicConfig(level=logging.DEBUG) engine = engine_mysql_commandline(echo=True) engine.connect() Session = sessionmaker() Session.configure(bind=engine) # once engine is available session = Session() # Create tables Base.metadata.create_all(engine) # ------------------------------------------------------------------------- # Unit testing for DateTimeAsIsoText # ------------------------------------------------------------------------- # Insert things t0_str = "2013-04-30T00:26:03.000+05:00" # previous month t1_str = "2013-05-30T03:26:00.000+01:00" # Sweden t2_str = "2013-05-30T03:00:00.000+00:00" # London t3_str = "2013-05-30T01:00:00.000-05:00" # New York t2b_str = "2013-05-30T04:00:00.000+01:00" # equals t2 t0 = dateutil.parser.parse(t0_str) t1 = dateutil.parser.parse(t1_str) t2 = dateutil.parser.parse(t2_str) t3 = dateutil.parser.parse(t3_str) t2b = dateutil.parser.parse(t2b_str) # t1 -> t3 decrease lexically, but increase temporally assert t1_str > t2_str > t3_str assert t0 < t1 < t2 == t2b < t3 session.query(TestIso).delete() alice = TestIso(id=1, name="alice", when_created=t1, when_deleted=t2, plain_datetime=python_datetime_to_utc(t3)) session.add(alice) bob = TestIso(id=2, name="bob", when_created=t2, when_deleted=t3, plain_datetime=python_datetime_to_utc(t0)) session.add(bob) celia = TestIso(id=3, name="celia", when_created=t3, when_deleted=t2, plain_datetime=python_datetime_to_utc(t1)) session.add(celia) david = TestIso(id=4, name="david", when_created=t3, when_deleted=t3, plain_datetime=python_datetime_to_utc(t1)) session.add(david) edgar = TestIso(id=5, name="edgar", when_created=t2b, when_deleted=t2, plain_datetime=python_datetime_to_utc(t2b)) session.add(edgar) session.commit() heading("A. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS literal") q = session.query(TestIso).filter(TestIso.when_created < t2) assert q.all() == [alice] q = session.query(TestIso).filter(TestIso.when_created == t2) assert q.all() == [bob, edgar] q = session.query(TestIso).filter(TestIso.when_created > t2) assert q.all() == [celia, david] heading("B. DateTimeAsIsoText test: literal VERSUS DateTimeAsIsoText " "field") q = session.query(TestIso).filter(t2 > TestIso.when_created) assert q.all() == [alice] q = session.query(TestIso).filter(t2 == TestIso.when_created) assert q.all() == [bob, edgar] q = session.query(TestIso).filter(t2 < TestIso.when_created) assert q.all() == [celia, david] heading("C. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS " "DateTimeAsIsoText field") q = session.query(TestIso).filter(TestIso.when_created < TestIso.when_deleted) assert q.all() == [alice, bob] q = session.query(TestIso).filter(TestIso.when_created == TestIso.when_deleted) assert q.all() == [david, edgar] q = session.query(TestIso).filter(TestIso.when_created > TestIso.when_deleted) assert q.all() == [celia] heading("D. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS " "plain DATETIME field") q = session.query(TestIso).filter(TestIso.when_created < TestIso.plain_datetime) assert q.all() == [alice] q = session.query(TestIso).filter(TestIso.when_created == TestIso.plain_datetime) # CAUTION: don't have any non-zero millisecond components; they'll get # stripped from the plain DATETIME and exact comparisons will then fail. assert q.all() == [edgar] q = session.query(TestIso).filter(TestIso.when_created > TestIso.plain_datetime) assert q.all() == [bob, celia, david] heading("E. DateTimeAsIsoText testplain DATETIME field VERSUS " "DateTimeAsIsoText field") q = session.query(TestIso).filter(TestIso.plain_datetime > TestIso.when_created) assert q.all() == [alice] q = session.query(TestIso).filter(TestIso.plain_datetime == TestIso.when_created) assert q.all() == [edgar] q = session.query(TestIso).filter(TestIso.plain_datetime < TestIso.when_created) assert q.all() == [bob, celia, david] heading("F. DateTimeAsIsoText test: SELECT everything") q = session.query(TestIso) assert q.all() == [alice, bob, celia, david, edgar] if __name__ == '__main__': test() -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.