Hello.

This is the current version of our windowed query:


from sqlalchemy.sql.expression import distinct


def windowed_query(query, column, options_or_callback, window_size=100):
    """Perform (a correct) yield_per() operation. See WindowedQuery.yield_per()
    for more.

    EXAMPLE:
        q = session.query(Foo).filter(Foo.name.like(u'%foo%'))
        wq = windowed_query(q, Foo.id, [subqueryload(Foo.bars)])
        for each_foo in wq:
            print each_foo.name
            for each_bar in each_foo.bars:
                print each_bar
    """
    return WindowedQuery(
        query,
        column,
        options_or_callback
    ).yield_per(window_size)


class WindowedQuery(object):
    """Perform (a correct) yield_per() operation."""
    def __init__(self, query, column, options_or_callback):
        """
        The query MUST have NO options(...) and NO order_by(...). It MUST
        contain all necessary join() and filter() to limit the result set as
        desired.

        The column is the id column of the main result ORM class. It is used to
        divide the results into windows of equal size.

        The options_or_callback can be a list of Query.options(...) such as
        subqueryload(). If so, the following query is created to fetch data of
        each window:

            q = session.query(self._column.class_).options(*self._options)
            q = q.filter(self._column.in_(each_window)

        The options_or_callback can be a one-argument function responsible for
        complete processing of one window. Its only argument is the list of ids
        of the window. It MUST return an iterable over results. It is called
        once for each window.
        """
        self._query = query
        self._column = column
        self._session = query.session
        if isinstance(options_or_callback, list):
            self._options = options_or_callback
            self._callback = None
        else:
            self._options = None
            self._callback = options_or_callback

    def yield_per(self, window_size):
        """Process results in chunks (windows).
        Steps:
        * Obtain ids of ALL result rows via slightly modified self._query.
        * Divide ids to chunks of equal size and perform ONE query for EACH
        chunk to fetch the data.

        A chunk is determined by the test q.filter(self._column.in_(chunk)).
        This is the only way that works in presence of the read-committed
        isolation level.
        """
        if self._options is not None:
            return self._yield_per_options(window_size)
        if self._callback is not None:
            return self._yield_per_callback(window_size)

    def _yield_per_options(self, window_size):
        """Deduce data query from self._column and self._options."""
        q = self._session.query(self._column.class_).options(*self._options)
        for each_window in self._windows(window_size):
            for each_result in q.filter(self._column.in_(each_window)):
                yield each_result

    def _yield_per_callback(self, window_size):
        """Use a callback function responsible for obtaining the results:
            def callback(win):
                # Equivalent to the use of self._options.
                q = session.query(Foo).options(...)
                q = q.filter(Foo.id.in_(win))
                return q
        """
        for each_window in self._windows(window_size):
            for each_result in self._callback(each_window):
                yield each_result

    def _windows(self, window_size):
        win = []
        win_size = 0
        for each in self._q_column():
            if win_size < window_size:
                win.append(each)
                win_size += 1
            if win_size == window_size:
                yield win
                win = []
                win_size = 0
        if win_size > 0:
            yield win

    def _q_column(self):
        """distinct() ensures that each id is returned at most once despite
        a possible multiplying effect of a join().
        """
        return self._query.with_entities(distinct(self._column))


We use it in the production for a few days now. Perhaps someone will find it
useful too.


Ladislav Lenart


On 7.6.2013 20:03, Ladislav Lenart wrote:
> Hello.
> 
> Resending because the original e-mail does not seem to make it to the mailing
> list. Apologise for any duplicates. Here we go...
> 
> 
> Hello.
> 
> I modified the recipe at
> 
>     http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery
> 
> to better suit my needs. Perhaps someone else will find this useful:
> 
> 
> ####################
> #### CODE BEGIN ####
> ####################
> 
> from sqlalchemy.orm import subqueryload
> from sqlalchemy.sql.expression import distinct
> 
> def windowed_query(filter_query, data_query, column, window_size):
>     """Perform (a correct) yield_per() operation. See 
> WindowedQuery.yield_per()
>     for more.
> 
>     EXAMPLE:
>         gen = windowed_query(
>             filter_query=session.query(Foo).filter(Foo.name.like(u'%foo%')),
>             data_query=session.query(Foo).options(Foo.bars),
>             column=Foo.id,
>             window_size=50,
>         )
>         for each_foo in gen:
>             print each_foo.name
>             for each_bar in each_foo.bars:
>                 print each_bar
>     """
>     return WindowedQuery(filter_query, data_query, 
> column).yield_per(window_size)
> 
> class WindowedQuery(object):
>     """Perform (a correct) yield_per() operation."""
>     def __init__(self, filter_query, data_query, column):
>         # A query with NO options(...) and NO order_by(...). MUST contain all
>         # necessary join() and filter() to limit the result set as desired.
>         self._filter_query = filter_query
>         # A simple query with options() to fetch the actual data.
>         self._data_query = data_query
>         # id column of the (main) result ORM class.
>         self._column = column
> 
>     def yield_per(self, window_size):
>         """Process results in chunks.
>         Steps:
>         * Obtain ids of ALL result rows via self._filter_query.
>         * Divide ids to chunks of equal size and perform ONE query for EACH
>         chunk to fetch the data via self._data_query.
> 
>         A chunk is determined by the test q.filter(self._column.in_(chunk)).
>         This is the only way that works in presence of the read-committed
>         isolation level.
>         """
>         q = self._data_query
>         for each_window in self._windows(window_size):
>             for each_result in q.filter(each_window):
>                 yield each_result
> 
>     def _windows(self, window_size):
>         chunk = []
>         chunk_size = 0
>         for each in self._q_column():
>             if chunk_size < window_size:
>                 chunk.append(each)
>                 chunk_size += 1
>             if chunk_size == window_size:
>                 yield self._window_for_chunk(chunk)
>                 chunk = []
>                 chunk_size = 0
>         if chunk_size > 0:
>             yield self._window_for_chunk(chunk)
> 
>     def _q_column(self):
>         # distinct() ensures that each id is returned at most once despite
>         # a possible multiplying effect of a join().
>         return self._filter_query.with_entities(distinct(self._column))
> 
>     def _window_for_chunk(self, chunk):
>         return self._column.in_(chunk)
> 
> ##################
> #### CODE END ####
> ##################
> 
> 
> MOTIVATION: I have learned recently that Query.yield_per() does not work 
> nicely
> in combination with subqueryload(). The above recipe fixes that. Unfortunately
> its usage is not as elegant and simple as q.yield_per(...).
> 
> If you have any idea how to accomplish the same with ONE query only (in SA 
> 0.7.9):
> 
> def windowed_query(query, column, window_size):
>     query --magic-> filter_query
>     query --magic-> data_query
>     ...
> 
> I would very much like to hear about it.
> 
> 
> PERFORMANCE: My first tests suggest that it might be one order of magnitude
> better than the Query.yield_per() we use now.
> 
> Note also that yield_per() with subqueryload() was still about twice as fast 
> as
> the same query without yield_per(). But this will be highly dependent on the
> query I guess.
> 
> 
> WARNING: We do not use this in the production yet. Use at your own risk.
> 
> 
> Happy SA hacking,
> 
> Ladislav Lenart
> 


-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to