Hello.

Here is the fully self-contained regression of the issue, including the
workaround for SA 0.7.9. Thank you again, because I wouldn't figure it out
without your help (the select_from part). I haven't tried it on SA 0.9.

If you have any questions, please ask.

HTH,

Ladislav Lenart


On 14.6.2013 11:18, Ladislav Lenart wrote:
> Hello.
> 
> 
> On 13.6.2013 18:44, Michael Bayer wrote:
>>
>> On Jun 13, 2013, at 11:03 AM, Ladislav Lenart <lenart...@volny.cz> wrote:
>>
>>> Unfortunately migrating to SA 0.9 is not an option for me at the moment due 
>>> to
>>> severe time constraints.
>>
>> I was pretty sure you'd say that, though I'm really looking to verify that 
>> my fixes are going to hold up under real world usage.   The issues you're 
>> having are real issues, and they've been fixed.
> 
> I will make a 1:1 self-containted test case for this issue during the next 
> week,
> I promise!
> 
> 
>>> Could you please help me write SA query for 0.7.9 that uses index scan and 
>>> also
>>> loads all the necessary relations? It must be possible with a proper use of
>>> from_statement(), contains_eager() and/or other SA features. It is just 
>>> that toy
>>> examples in the documentation don't help me much with this complex beast.
>>
>> you use the SQL expression language in conjunction with .join()/outerjoin(), 
>> pass to query.select_from(), then use contains_eager():
>>
>> j = 
>> Foo.__table__.outerjoin(Bar.__table__.outerjoin(BarA.__table__).outerjoin(Data.__table__))
>>
>> q = s.query(Foo).\
>>             select_from(j).\
>>             filter(Foo.id.in_([1, 2, 3])).\
>>             options(
>>                 contains_eager(Foo.bar),
>>                 contains_eager(Foo.bar.of_type(BarA), BarA.data)
>>             )
> 
> Thank you very much! I am going to try it :-)
> 
> 
> Ladislav Lenart

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/groups/opt_out.


# coding=utf-8
"""
Standalone regression of the full table-scan problem in combination with joined 
table inheritance. Applies to SA 0.7.9 at least.

See: Issue, Workaround, main_issue, main_workaround and main_fill.

The rest is support code and/or ORM definitions.
"""
from sqlalchemy import Column, ForeignKey, UniqueConstraint, CheckConstraint,\
    Integer, Unicode, String, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import distinct
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.orm import relationship, subqueryload_all, subqueryload,\
    joinedload_all, contains_eager
from sqlalchemy.orm.util import aliased


Base = declarative_base()
session = None


class Contact(Base):
    __tablename__ = 'Contact'
    id = Column(Integer(), primary_key=True)
    type = Column(Integer(), nullable=False) # 1 - personal, 2 - corporate
    parent_id = Column(Integer(), ForeignKey('Partner.id', ondelete='CASCADE'), nullable=False)
    parent = relationship('Partner', back_populates='contacts', primaryjoin='Contact.parent_id == Partner.id', remote_side='Partner.id', uselist=False)
    contact_tags = relationship('ContactTag', back_populates='contact', primaryjoin='Contact.id == ContactTag.contact_id', cascade='all, delete-orphan', passive_updates=True, passive_deletes=True)
    phones = relationship('Phone', back_populates='contact', primaryjoin='Contact.id == Phone.contact_id', cascade='all, delete-orphan', passive_updates=True, passive_deletes=True)
    emails = relationship('Email', back_populates='contact', primaryjoin='Contact.id == Email.contact_id', cascade='all, delete-orphan', passive_updates=True, passive_deletes=True)
    __mapper_args__ = {'polymorphic_on': type}
    __table_args__ = (CheckConstraint(u'type in (1, 2)'),)

class ContactTag(Base):
    __tablename__ = 'ContactTag'
    id = Column(Integer(), primary_key=True)
    tag_id = Column(Integer(), ForeignKey('Tag.id', ondelete='CASCADE'), nullable=False)
    tag = relationship('Tag', primaryjoin='ContactTag.tag_id == Tag.id', remote_side='Tag.id', uselist=False)
    contact_id = Column(Integer(), ForeignKey('Contact.id', ondelete='CASCADE'), nullable=False)
    contact = relationship('Contact', back_populates='contact_tags', primaryjoin='ContactTag.contact_id == Contact.id', remote_side='Contact.id', uselist=False)
    __table_args__ = (UniqueConstraint('contact_id', 'tag_id'),)

class Tag(Base):
    __tablename__ = 'Tag'
    id = Column(Integer(), primary_key=True)
    partner_id = Column(Integer(), ForeignKey('Partner.id', ondelete='CASCADE'), nullable=False, index=True)
    partner = relationship('Partner', back_populates='tags', primaryjoin='Tag.partner_id == Partner.id', remote_side='Partner.id', uselist=False)
    label = Column(Unicode(), nullable=False, default=u'')

class Phone(Base):
    __tablename__ = 'Phone'
    id = Column(Integer(), primary_key=True)
    contact_id = Column(Integer(), ForeignKey('Contact.id', ondelete='CASCADE'), nullable=False, index=True)
    contact = relationship('Contact', back_populates='phones', primaryjoin='Phone.contact_id == Contact.id', remote_side='Contact.id', uselist=False)
    number = Column(String(), nullable=False)

class Email(Base):
    __tablename__ = 'Email'
    id = Column(Integer(), primary_key=True)
    contact_id = Column(Integer(), ForeignKey('Contact.id', ondelete='CASCADE'), nullable=False, index=True)
    contact = relationship('Contact', back_populates='emails', primaryjoin='Email.contact_id == Contact.id', remote_side='Contact.id', uselist=False)
    address = Column(Unicode(), nullable=False)

class PersonalContact(Contact):
    __tablename__ = 'PersonalContact'
    id = Column(Integer(), ForeignKey('Contact.id', ondelete='CASCADE'), nullable=False, primary_key=True)
    client_id = Column(Integer(), ForeignKey('PersonalClient.id', ondelete='SET NULL'))
    client = relationship('PersonalClient', back_populates='contacts', primaryjoin='PersonalContact.client_id == PersonalClient.id', remote_side='PersonalClient.id', uselist=False)
    partner_id = Column(Integer(), ForeignKey('Partner.id', ondelete='SET NULL'), unique=True)
    partner = relationship('Partner', back_populates='contact', primaryjoin='PersonalContact.partner_id == Partner.id', remote_side='Partner.id', uselist=False)
    first_name = Column(Unicode(), nullable=False, server_default=u'')
    last_name = Column(Unicode(), nullable=False, server_default=u'')
    birth_date = Column(Date())
    __mapper_args__ = {'polymorphic_identity': 1}
    __table_args__ = None

    @classmethod
    def client_cls(cls):
        return PersonalClient

class CorporateContact(Contact):
    __tablename__ = 'CorporateContact'
    id = Column(Integer(), ForeignKey('Contact.id', ondelete='CASCADE'), nullable=False, primary_key=True)
    client_id = Column(Integer(), ForeignKey('CorporateClient.id', ondelete='SET NULL'))
    client = relationship('CorporateClient', back_populates='contacts', primaryjoin='CorporateContact.client_id == CorporateClient.id', remote_side='CorporateClient.id', uselist=False)
    name = Column(Unicode(), nullable=False, server_default=u'')
    identification_number = Column(Unicode(), nullable=False, server_default=u'')
    __mapper_args__ = {'polymorphic_identity': 2}
    __table_args__ = None

    @classmethod
    def client_cls(cls):
        return CorporateClient

class Client(Base):
    __tablename__ = 'Client'
    id = Column(Integer(), primary_key=True)
    type = Column(Integer(), nullable=False) # 1 - personal, 2 - corporate
    __mapper_args__ = {'polymorphic_on': type}
    __table_args__ = (CheckConstraint(u'type in (1, 2)'),)

class PersonalClient(Client):
    __tablename__ = 'PersonalClient'
    id = Column(Integer(), ForeignKey('Client.id', ondelete='CASCADE'), nullable=False, primary_key=True)
    data_id = Column(Integer(), ForeignKey('PersonalData.id', ondelete='CASCADE'), nullable=False, index=True)
    data = relationship('PersonalData', primaryjoin='PersonalClient.data_id == PersonalData.id', remote_side='PersonalData.id', uselist=False)
    contacts = relationship('PersonalContact', back_populates='client', primaryjoin='PersonalClient.id == PersonalContact.client_id')
    __mapper_args__ = {'polymorphic_identity': 1}
    __table_args__ = None

    @classmethod
    def data_cls(cls):
        return PersonalData

class CorporateClient(Client):
    __tablename__ = 'CorporateClient'
    id = Column(Integer(), ForeignKey('Client.id', ondelete='CASCADE'), nullable=False, primary_key=True)
    data_id = Column(Integer(), ForeignKey('CorporateData.id', ondelete='CASCADE'), nullable=False, index=True)
    data = relationship('CorporateData', primaryjoin='CorporateClient.data_id == CorporateData.id', remote_side='CorporateData.id', uselist=False)
    contacts = relationship('CorporateContact', back_populates='client', primaryjoin='CorporateClient.id == CorporateContact.client_id')
    __mapper_args__ = {'polymorphic_identity': 2}
    __table_args__ = None

    @classmethod
    def data_cls(cls):
        return CorporateData

class Partner(Base):
    __tablename__ = 'Partner'
    id = Column(Integer(), primary_key=True)
    personal_data_id = Column(Integer(), ForeignKey('PersonalData.id', ondelete='CASCADE'), nullable=False)
    personal_data = relationship('PersonalData', primaryjoin='Partner.personal_data_id == PersonalData.id', remote_side='PersonalData.id', uselist=False)
    contacts = relationship('Contact', back_populates='parent', primaryjoin='Partner.id == Contact.parent_id', cascade='all, delete-orphan', passive_updates=True, passive_deletes=True)
    contact = relationship('PersonalContact', back_populates='partner', primaryjoin='Partner.id == PersonalContact.partner_id', uselist=False)
    tags = relationship('Tag', back_populates='partner', primaryjoin='Partner.id == Tag.partner_id', cascade='all, delete-orphan', passive_updates=True, passive_deletes=True)

class PersonalData(Base):
    __tablename__ = 'PersonalData'
    id = Column(Integer(), primary_key=True)
    first_name = Column(Unicode(), nullable=False, server_default=u'')
    last_name = Column(Unicode(), nullable=False, server_default=u'')
    birth_date = Column(Date())
    address_id = Column(Integer(), ForeignKey('Address.id', ondelete='CASCADE'), nullable=False, index=True)
    address = relationship('Address', primaryjoin='PersonalData.address_id == Address.id', remote_side='Address.id', uselist=False)

class CorporateData(Base):
    __tablename__ = 'CorporateData'
    id = Column(Integer(), primary_key=True)
    name = Column(Unicode(), nullable=False, server_default=u'')
    identification_number = Column(Unicode(), nullable=False, server_default=u'')
    address_id = Column(Integer(), ForeignKey('Address.id', ondelete='CASCADE'), nullable=False, index=True)
    address = relationship('Address', primaryjoin='CorporateData.address_id == Address.id', remote_side='Address.id', uselist=False)

class Address(Base):
    __tablename__ = 'Address'
    id = Column(Integer(), primary_key=True)
    street = Column(Unicode(), nullable=False, server_default=u'')
    postal_code = Column(Unicode(), nullable=False, server_default=u'')
    city = Column(Unicode(), nullable=False, server_default=u'')
    country = Column(Unicode(), nullable=False, server_default=u'')


def windowed_query(query, column, options_or_callback, window_size=100):
    """Perform (a correct) yield_per() operation. See WindowedQuery.yield_per() 
    for more.

    EXAMPLE:
        q = session.query(Foo).filter(Foo.name.like(u'%foo%'))
        wq = windowed_query(q, Foo.id, [subqueryload(Foo.bars)])
        for each_foo in wq:
            print each_foo.name
            for each_bar in each_foo.bars:
                print each_bar
    """
    return WindowedQuery(query, column, options_or_callback).yield_per(window_size)

class WindowedQuery(object):
    """Perform (a correct) yield_per() operation."""
    def __init__(self, query, column, options_or_callback):
        """The query MUST have NO options(...) and NO order_by(...). It MUST 
        contain all necessary join() and filter() to limit the result set as 
        desired.

        The column is the id column of the main result ORM class. It is used to 
        divide the results into windows of equal size.

        The options_or_callback can be a list of Query.options(...) such as 
        subqueryload(). If so, the following query is created to fetch data of 
        each window:

            q = session.query(self._column.class_).options(*self._options)
            q = q.filter(self._column.in_(each_window)

        The options_or_callback can be a one-argument function responsible for 
        complete processing of one window. Its only argument is the list of ids 
        of the window. It MUST return an iterable over results. It is called 
        once for each window.
        """
        self._query = query
        self._column = column
        if isinstance(options_or_callback, list):
            self._options = options_or_callback
            self._callback = None
        else:
            self._options = None
            self._callback = options_or_callback


    def yield_per(self, window_size):
        """Process results in chunks (windows).
        Steps:
        * Obtain ids of ALL result rows via slightly modified self._query.
        * Divide ids to chunks of equal size and perform ONE query for EACH 
        chunk to fetch the data.

        A chunk is determined by the test q.filter(self._column.in_(chunk)).
        This is the only way that works in presence of the read-committed 
        isolation level.
        """
        if self._options is not None:
            return self._yield_per_options(window_size)
        if self._callback is not None:
            return self._yield_per_callback(window_size)

    def _yield_per_options(self, window_size):
        """Deduce data query from self._column and self._options."""
        q = session.query(self._column.class_).options(*self._options)
        for each_window in self._windows(window_size):
            for each_result in q.filter(self._column.in_(each_window)):
                yield each_result

    def _yield_per_callback(self, window_size):
        """Use a callback function responsible for obtaining the results:
            def callback(win):
                # Equivalent to the use of self._options.
                q = session.query(Foo).options(...)
                q = q.filter(Foo.id.in_(win))
                return q
        """
        for each_window in self._windows(window_size):
            for each_result in self._callback(each_window):
                yield each_result

    def _windows(self, window_size):
        win = []
        win_size = 0
        for each in self._q_column():
            if win_size < window_size:
                win.append(each)
                win_size += 1
            if win_size == window_size:
                yield win
                win = []
                win_size = 0
        if win_size > 0:
            yield win

    def _q_column(self):
        """distinct() ensures that each id is returned at most once despite 
        a possible multiplying effect of a join().
        """
        return self._query.with_entities(distinct(self._column))


class Issue(object):
    @classmethod
    def find_contacts(cls, partner_id, clients_only):
        for contact_cls in [PersonalContact, CorporateContact]:
            q = cls._q_find_contacts(partner_id, contact_cls, clients_only)
            options = cls._q_find_contacts_options(contact_cls)
            for each_contact in windowed_query(q, contact_cls.id, options):
                tags = [each_tag.tag for each_tag in each_contact.contact_tags if each_tag.tag.partner_id == partner_id]
                yield (tags, each_contact)

    @classmethod
    def _q_find_contacts(cls, partner_id, contact_cls, clients_only):
        q = session.query(contact_cls)
        q = q.filter(contact_cls.parent_id == partner_id)
        if clients_only:
            q = q.filter(contact_cls.client != None)
        else:
            q = q.filter(contact_cls.client == None)
        return q

    @classmethod
    def _q_find_contacts_options(cls, contact_cls):
        client_cls = contact_cls.client_cls()
        data_cls = client_cls.data_cls()
        options = [
            subqueryload_all(contact_cls.contact_tags, ContactTag.tag),
            subqueryload(contact_cls.phones),
            subqueryload(contact_cls.emails),
            # NOTE: The joinedload of client causes a full scan of tables client 
            # and personal_client/corporate_client. 
            joinedload_all(contact_cls.client, client_cls.data, data_cls.address),
        ]
        if contact_cls is PersonalContact:
            options.extend([
                joinedload_all(contact_cls.partner, Partner.personal_data, data_cls.address),
            ])
        return options

class Workaround(object):
    @classmethod
    def find_contacts(cls, partner_id, clients_only):
        for contact_cls in [PersonalContact, CorporateContact]:
            q = cls._find_contacts_filter(partner_id, contact_cls, clients_only)
            wq = windowed_query(q, contact_cls.id, lambda win: cls._find_contacts_fetch_window(contact_cls, win))
            for each_contact in wq:
                tags = [each_contact_tag.tag for each_contact_tag in each_contact.contact_tags if each_contact_tag.tag.partner_id == partner_id]
                yield (tags, each_contact)

    @classmethod
    def _find_contacts_filter(cls, partner_id, contact_cls, clients_only):
        q = session.query(contact_cls)
        q = q.filter(contact_cls.parent_id == partner_id)
        if clients_only:
            q = q.filter(contact_cls.client != None)
        else:
            q = q.filter(contact_cls.client == None)
        return q

    @classmethod
    def _find_contacts_fetch_window(cls, contact_cls, win):
        """Special data-fetching query for contacts and all their related info 
        including tags, partner, client,...

        NOTE: We build the FROM part entirely by hand, because SA generates bad 
        SQL for postgres. It does a FULL SCAN of client and personal_client / 
        corporate_client even though it reads at most window_size rows from 
        them. All this because SA inheritance creates a subselect which leads 
        to the full scan.
        """
        client_cls = contact_cls.client_cls()
        data_cls = client_cls.data_cls()

        # We need TABLEs to build the FROM part by hand.
        # We reference PersonalData/CorporateData and Address twice, hence we 
        # need to alias them.
        # We also need their aliased ORM classes for contains_eager() to work.
        contact_table = Contact.__table__
        contact_subtable = contact_cls.__table__
        client_table = Client.__table__
        personal_client_table = client_cls.__table__
        partner_table = Partner.__table__
        partner_data_table = PersonalData.__table__.alias(name='partner_data')
        partner_address_table = Address.__table__.alias(name='partner_address')
        client_data_table = data_cls.__table__.alias(name='client_data')
        client_address_table = Address.__table__.alias(name='client_address')
        partner_data = aliased(PersonalData, partner_data_table)
        partner_address = aliased(Address, partner_address_table)
        client_data = aliased(data_cls, client_data_table)
        client_address = aliased(Address, client_address_table)
        select_from = contact_table.join(
            contact_subtable,
            contact_table.c.id == contact_subtable.c.id
        ).outerjoin(
            client_table,
            contact_subtable.c.client_id == client_table.c.id
        ).outerjoin(
            personal_client_table,
            client_table.c.id == personal_client_table.c.id
        ).outerjoin(
            client_data_table,
            personal_client_table.c.data_id == client_data_table.c.id
        ).outerjoin(
            client_address_table,
            client_data_table.c.address_id == client_address_table.c.id
        )
        options = [
            subqueryload_all(contact_cls.contact_tags, ContactTag.tag),
            subqueryload(contact_cls.phones),
            subqueryload(contact_cls.emails),
            contains_eager(contact_cls.client),
            contains_eager(contact_cls.client, client_cls.data, alias=client_data),
            contains_eager(contact_cls.client, client_cls.data, client_data.address, alias=client_address),
        ]
        if contact_cls is PersonalContact:
            select_from = select_from.outerjoin(
                partner_table,
                contact_subtable.c.partner_id == partner_table.c.id
            ).outerjoin(
                partner_data_table,
                partner_table.c.personal_data_id == partner_data_table.c.id
            ).outerjoin(
                partner_address_table,
                partner_data_table.c.address_id == partner_address_table.c.id
            )
            options.extend([
                contains_eager(contact_cls.partner),
                contains_eager(contact_cls.partner, Partner.personal_data, alias=partner_data),
                contains_eager(contact_cls.partner, Partner.personal_data, partner_data.address, alias=partner_address),
            ])
        q = session.query(contact_cls).select_from(select_from)
        q = q.filter(contact_cls.id.in_(win))
        q = q.options(*options)
        return q


def create_partner():
    return Partner(personal_data=PersonalData(address=Address()))

def create_personal_contact(parent, index):
    return PersonalContact(
        parent=parent,
        last_name=u''.join([u'Person ', unicode(index)])
    )

def create_corporate_contact(parent, index):
    return CorporateContact(
        parent=parent,
        name=u''.join([u'Corporation ', unicode(index)])
    )

def create_personal_client(index):
    return PersonalClient(
        data=PersonalData(
            last_name=u''.join([u'Person ', unicode(index)]),
            address=Address()
        )
    )

def create_corporate_client(index):
    return CorporateClient(
        data=CorporateData(
            name=u''.join([u'Corporation ', unicode(index)]),
            address=Address()
        )
    )

def ensure_partner_with_few_contacts():
    partner = session.query(Partner).first()
    if partner is None:
        partner = create_partner()
        session.add(partner)
        session.add_all(gen_create_partner_contacts(partner, 5))
    return partner

def gen_create_partner_contacts(partner, num_contacts):
    for each in xrange(1, num_contacts + 1):
        yield create_personal_contact(partner, each)
        yield create_corporate_contact(partner, each)

def create_clients(start_index=1, count=100):
    session.add_all(gen_create_clients(start_index, count))

def gen_create_clients(start_index, count):
    for each in xrange(start_index, count + 1):
        yield create_personal_client(each)
        yield create_corporate_client(each)



def configure(echo=True):
    global session
    conn_string = 'postgresql+psycopg2://lada:heslo@localhost:5433/zfp_xxx'
    engine = create_engine(conn_string, echo=echo)
    session = sessionmaker(bind=engine, autoflush=False)()
    Base.metadata.bind = engine
    Base.metadata.create_all()

def main_fill(start_index=1, count=1000):
    """Populate client-related tables to make the index scan cheaper than the 
    sequential scan, i.e. the count must be big enough.
    """
    configure(False)
    create_clients(start_index, count)
    session.commit()

def main_issue():
    _main(Issue)

def main_workaround():
    _main(Workaround)

def _main(cls):
    """Demonstrate the workaround."""
    configure()

    print u"#### BEGIN INSERTING DATA ####"
    partner = ensure_partner_with_few_contacts()
    session.flush()
    print u"#### END INSERTING DATA ####"

    # NOTE: No SQL is emitted here.
    for (each_tags, each_contact) in cls.find_contacts(partner.id, False):
        for each in each_tags:
            print each.tag.label
        for each in each_contact.phones:
            print each.number
        for each in each_contact.emails:
            print each.address


if __name__ == '__main__':
#    main_fill()
    main_issue()
#    main_workaround()

Reply via email to