On 02/09/2016 12:30 AM, Rudolf Cardinal wrote:
Dear all,


I've been trying to implement a datetime-style field in SQL Alchemy
that, in the database backend, uses a specific ISO-8601 format (e.g.
"2013-04-30T00:26:03.000+05:00"), and is a datetime at the Python end.
The primary database engine is MySQL.

I don't want to implement this as a hybrid property, because I want a
lot of these fields and to be able to use them automatically. I've
implemented a TypeDecorator with a custom comparator (code below). Let's
call the field type DateTimeAsIsoText. The field works fine for the
following operations:

  * save from Python to database
  * load from database to Python
  * compare database column to literal, either as (DateTimeAsIsoText op
    LITERAL), or (LITERAL op DateTimeAsIsoText)
  * compare DateTimeAsIsoText column to another column of the same type
  * compare DateTimeAsIsoText to DATETIME
  * ... but *not* DATETIME to DateTimeAsIsoText.

I'm stuck because the comparator's operate() function isn't called in
this situation, it seems, and neither is its reverse_operate() function.

well Python only allows one side's operator override to be called, so when DateTime is on the left, its own operate() method is called and not yours.

Because DateTime doesn't know anything about your type, in order to make it aware, you have to change it.

I couldn't get your coercion logic to do exactly the expected thing, but the general idea is like this:

class ISOComparableDateTime(DateTime):
    class comparator_factory(DateTime.comparator_factory):
        def operate(self, op, other):
            if isinstance(other.type, DateTimeAsIsoText):
                return op(mysql_isotzdatetime_to_utcdatetime(other),
                          mysql_unknown_field_to_utcdatetime(self.expr))
            else:
                return DateTime.comparator_factory.operate(self, op, other)



class TestIso(Base):
    __tablename__ = 'test_iso_datetime'
    id = Column(Integer, primary_key=True)
    name = Column(String(20))
    plain_datetime = Column(ISOComparableDateTime)
    when_created = Column(DateTimeAsIsoText)
    when_deleted = Column(DateTimeAsIsoText)


I'm not familiar with any other hook right now that can otherwise change what DateTime does with an unknown datatype on the right side. Not that there couldn't be such a hook, but at the moment I don't think there's a "custom coerce the right-hand-side" hook so using a modified DateTime on the left is the most direct solution.






I'm using SQLAlchemy 1.0.11 (and MySQL 5.6.28, though I suspect that
isn't relevant).


Sample code is below.


*Results: *All comparisons work, except the last set of comparisons, of
a plain DATETIME field to a DateTimeAsIsoText field /(in that order/;
comparison section "E" in the code below).


Am I missing something obvious, or is it not possible to override SQL
comparisons when the SQLAlchemy field is on the right-hand side of a
comparison?


Thank you!

all the best,

Rudolf.



*Code:*


#!/usr/bin/env python3


import datetime

import dateutil.parser

import getpass

import logging

import pytz

import sqlalchemy

from sqlalchemy import (

     Column,

     DateTime,

     Integer,

     String,

     TypeDecorator

)

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

from sqlalchemy.sql.expression import func


logger = logging.getLogger(__name__)

logger.addHandler(logging.NullHandler())


Base = declarative_base()



#
=============================================================================

# Ancillary functions

#
=============================================================================


def heading(x):

     print("=" * 79)

     print(x)

     print("=" * 79)



def ask_user(prompt, default=None, returntype=None, mask=False):

     if default is not None:

         fullprompt = "{} [{}]: ".format(prompt, default)

     else:

         fullprompt = "{}: ".format(prompt)

     success = False

     while not success:

         if mask:

             value = getpass.getpass(fullprompt) or default

         else:

             value = input(fullprompt) or default

         if returntype is not None:

             try:

                 value = returntype(value)

                 success = True

             except:

                 print("Bad value, try again")

         else:

             success = True

     return value



def engine_mysql(user, password, host, port, database, echo=True,

                  interface="pymysql"):

     CONNECTSTRING = (

         "mysql+{interface}://{user}:{password}@{host}:{port}/"

         "{database}".format(

             interface=interface,

             user=user,

             password=password,

             host=host,

             port=port,

             database=database

         ))

     # Removed "?charset=utf8&use_unicode=0"; with PyMySQL==0.7.1 it causes

     # TypeError: 'str' does not support the buffer interface

     # because dates come back as e.g. b'2013-05-30 06:00:00' and then the

     # convert_datetime function in pymysql/converters.py chokes.

     return sqlalchemy.create_engine(CONNECTSTRING, echo=echo)



def engine_mysql_commandline(echo=True):

     host = ask_user("Host", "localhost")

     port = 3306

     database = ask_user("Database", "testdb")

     user = ask_user("User", "root")

     password = ask_user("Password", mask=True)

     return engine_mysql(user, password, host, port, database, echo=echo)



#
=============================================================================

# Custom date/time field as ISO-8601 text including timezone

#
=============================================================================


def python_datetime_to_iso(x):

     """From a Python datetime to an ISO-formatted string in our particular

     format."""

     #
https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior
  # noqa

     try:

         mainpart = x.strftime("%Y-%m-%dT%H:%M:%S.%f")  # microsecond
accuracy

         timezone = x.strftime("%z")  # won't have the colon in

         return mainpart + timezone[:-2] + ":" + timezone[-2:]

     except AttributeError:

         return None



def iso_to_python_datetime(x):

     """From an ISO-formatted string to a Python datetime, with timezone."""

     try:

         return dateutil.parser.parse(x)

     except (AttributeError, ValueError):

         return None



def python_datetime_to_utc(x):

     """From a Python datetime, with timezone, to a UTC Python version."""

     try:

         return x.astimezone(pytz.utc)

     except AttributeError:

         return None



def mysql_isotzdatetime_to_utcdatetime(x):

     """Creates an SQL expression wrapping a field containing our
ISO-8601 text,

     making a DATETIME out of it, in the UTC timezone."""

     # For format, see

     #
https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format
  # noqa

     # Note the use of "%i" for minutes.

     # Things after "func." get passed to the database engine as literal SQL

     # functions; http://docs.sqlalchemy.org/en/latest/core/tutorial.html

     return func.CONVERT_TZ(

         func.STR_TO_DATE(

             func.LEFT(x, func.LENGTH(x) - 6),

             '%Y-%m-%dT%H:%i:%S.%f'

         ),

         func.RIGHT(x, 6),

         "+00:00"

     )



def mysql_unknown_field_to_utcdatetime(x):

     """The field might be a DATETIME, or an ISO-formatted field."""

     return func.IF(

         func.LENGTH(x) == 19,

         # ... length of a plain DATETIME e.g. 2013-05-30 00:00:00

         x,

         mysql_isotzdatetime_to_utcdatetime(x)

     )



class DateTimeAsIsoText(TypeDecorator):

     '''Stores date/time values as ISO-8601.'''

     impl = sqlalchemy.types.String(32)  # underlying SQL type


     def process_bind_param(self, value, dialect):

         """Convert things on the way from Python to the database."""

         logger.debug(

             "process_bind_param(self={}, value={}, dialect={})".format(

                 repr(self), repr(value), repr(dialect)))

         return python_datetime_to_iso(value)


     def process_literal_param(self, value, dialect):

         """Convert things on the way from Python to the database."""

         logger.debug(

             "process_literal_param(self={}, value={}, dialect={})".format(

                 repr(self), repr(value), repr(dialect)))

         return python_datetime_to_iso(value)


     def process_result_value(self, value, dialect):

         """Convert things on the way from the database to Python."""

         logger.debug(

             "process_result_value(self={}, value={}, dialect={})".format(

                 repr(self), repr(value), repr(dialect)))

         return iso_to_python_datetime(value)


     class comparator_factory(TypeDecorator.Comparator):

         """Process SQL for when we are comparing our column, in the
database,

         to something else."""

         def operate(self, op, other):

             if isinstance(other, datetime.datetime):

                 processed_other = python_datetime_to_utc(other)

             else:

                 # OK. At this point, "other" could be a plain DATETIME
field,

                 # or a DateTimeAsIsoText field (or potentially something

                 # else that we don't really care about). If it's a
DATETIME,

                 # then we assume it is already in UTC.

                 processed_other = mysql_unknown_field_to_utcdatetime(other)

             logger.debug("operate(self={}, op={}, other={})".format(

                 repr(self), repr(op), repr(other)))

             logger.debug("self.expr = {}".format(repr(self.expr)))

             # traceback.print_stack()

             return op(mysql_isotzdatetime_to_utcdatetime(self.expr),

                       processed_other)

             # NOT YET IMPLEMENTED: dialects other than MySQL, and how to

             # detect the dialect at this point.


         def reverse_operate(self, op, other):

             assert False, "I don't think this is ever being called"



class TestIso(Base):

     __tablename__ = 'test_iso_datetime'

     id = Column(Integer, primary_key=True)

     name = Column(String(20))

     plain_datetime = Column(DateTime)

     when_created = Column(DateTimeAsIsoText)

     when_deleted = Column(DateTimeAsIsoText)


     def __repr__(self):

         return (

             "<TestIso(id={}, name={}, "

             "plain_datetime={}, when_created={}, when_deleted={})>".format(

                 self.id, self.name,

                 repr(self.plain_datetime),

                 repr(self.when_created), repr(self.when_deleted),

                 self.q1)

         )



#
=============================================================================

# Main, with unit testing

#
=============================================================================


def test():

     logging.basicConfig(level=logging.DEBUG)


     engine = engine_mysql_commandline(echo=True)

     engine.connect()

     Session = sessionmaker()

     Session.configure(bind=engine)  # once engine is available

     session = Session()

     # Create tables

     Base.metadata.create_all(engine)


     #
-------------------------------------------------------------------------

     # Unit testing for DateTimeAsIsoText

     #
-------------------------------------------------------------------------


     # Insert things

     t0_str = "2013-04-30T00:26:03.000+05:00"  # previous month

     t1_str = "2013-05-30T03:26:00.000+01:00"  # Sweden

     t2_str = "2013-05-30T03:00:00.000+00:00"  # London

     t3_str = "2013-05-30T01:00:00.000-05:00"  # New York

     t2b_str = "2013-05-30T04:00:00.000+01:00"  # equals t2


     t0 = dateutil.parser.parse(t0_str)

     t1 = dateutil.parser.parse(t1_str)

     t2 = dateutil.parser.parse(t2_str)

     t3 = dateutil.parser.parse(t3_str)

     t2b = dateutil.parser.parse(t2b_str)


     # t1 -> t3 decrease lexically, but increase temporally

     assert t1_str > t2_str > t3_str

     assert t0 < t1 < t2 == t2b < t3


     session.query(TestIso).delete()

     alice = TestIso(id=1, name="alice",

                     when_created=t1, when_deleted=t2,

                     plain_datetime=python_datetime_to_utc(t3))

     session.add(alice)

     bob = TestIso(id=2, name="bob",

                   when_created=t2, when_deleted=t3,

                   plain_datetime=python_datetime_to_utc(t0))

     session.add(bob)

     celia = TestIso(id=3, name="celia",

                     when_created=t3, when_deleted=t2,

                     plain_datetime=python_datetime_to_utc(t1))

     session.add(celia)

     david = TestIso(id=4, name="david",

                     when_created=t3, when_deleted=t3,

                     plain_datetime=python_datetime_to_utc(t1))

     session.add(david)

     edgar = TestIso(id=5, name="edgar",

                     when_created=t2b, when_deleted=t2,

                     plain_datetime=python_datetime_to_utc(t2b))

     session.add(edgar)

     session.commit()


     heading("A. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS
literal")

     q = session.query(TestIso).filter(TestIso.when_created < t2)

     assert q.all() == [alice]

     q = session.query(TestIso).filter(TestIso.when_created == t2)

     assert q.all() == [bob, edgar]

     q = session.query(TestIso).filter(TestIso.when_created > t2)

     assert q.all() == [celia, david]


     heading("B. DateTimeAsIsoText test: literal VERSUS DateTimeAsIsoText "

             "field")

     q = session.query(TestIso).filter(t2 > TestIso.when_created)

     assert q.all() == [alice]

     q = session.query(TestIso).filter(t2 == TestIso.when_created)

     assert q.all() == [bob, edgar]

     q = session.query(TestIso).filter(t2 < TestIso.when_created)

     assert q.all() == [celia, david]


     heading("C. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS "

             "DateTimeAsIsoText field")

     q = session.query(TestIso).filter(TestIso.when_created <

                                       TestIso.when_deleted)

     assert q.all() == [alice, bob]

     q = session.query(TestIso).filter(TestIso.when_created ==

                                       TestIso.when_deleted)

     assert q.all() == [david, edgar]

     q = session.query(TestIso).filter(TestIso.when_created >

                                       TestIso.when_deleted)

     assert q.all() == [celia]


     heading("D. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS "

             "plain DATETIME field")

     q = session.query(TestIso).filter(TestIso.when_created <

                                       TestIso.plain_datetime)

     assert q.all() == [alice]

     q = session.query(TestIso).filter(TestIso.when_created ==

                                       TestIso.plain_datetime)

     # CAUTION: don't have any non-zero millisecond components; they'll get

     # stripped from the plain DATETIME and exact comparisons will then
fail.

     assert q.all() == [edgar]

     q = session.query(TestIso).filter(TestIso.when_created >

                                       TestIso.plain_datetime)

     assert q.all() == [bob, celia, david]


     heading("E. DateTimeAsIsoText testplain DATETIME field VERSUS "

             "DateTimeAsIsoText field")

     q = session.query(TestIso).filter(TestIso.plain_datetime >

                                       TestIso.when_created)

     assert q.all() == [alice]

     q = session.query(TestIso).filter(TestIso.plain_datetime ==

                                       TestIso.when_created)

     assert q.all() == [edgar]

     q = session.query(TestIso).filter(TestIso.plain_datetime <

                                       TestIso.when_created)

     assert q.all() == [bob, celia, david]


     heading("F. DateTimeAsIsoText test: SELECT everything")

     q = session.query(TestIso)

     assert q.all() == [alice, bob, celia, david, edgar]


if __name__ == '__main__':

     test()


--
You received this message because you are subscribed to the Google
Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to sqlalchemy+unsubscr...@googlegroups.com
<mailto:sqlalchemy+unsubscr...@googlegroups.com>.
To post to this group, send email to sqlalchemy@googlegroups.com
<mailto:sqlalchemy@googlegroups.com>.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to