On Sun, 27 Apr 2008 23:35:06 -0400
Kyle Schaffrick <[EMAIL PROTECTED]> wrote:

> Since as I mentioned, this is kind of an interesting learning project
> for me to learn some SA internals, I'd like my next step to be seeing
> if I can write something that will make a callable/closure which, when
> passed into iter_merging, will produce the least surprising ordering
> of results w/r/t what was requested in the original ShardedQuery.

I have taken a first stab at this to play with the idea and it seems to
be doing the right thing, the patch also contains some very rudimentary
tests in ShardTest that make it voom.

The patch adds a .merge_ordering() method to ShardedQuery that allows
supplying an ordering function that will take a short (upper limit being
the number of shards involved) list of items and return the one that
comes first. The type of the items is whatever you would get back from
the query ordinarily. If you use it with an order_by that enforces the
same ordering, you can get a complete ordering of results for queries
that pull results from multiple shards.

Example:
>>> q = s.query(Person).order_by(Person.age.asc())
>>> mq = q.merge_ordering(lambda l: min(l, key=(lambda p: p.age)))

If you don't call merge_ordering, it doesn't do any merging and uses the
standard behavior of concatenating the results.

"merge_ordering" is probably a crappy name, so if this gets any interest
then we can figure out what else to call it. One aspect I'm not sure
about is whether or not merge_ordering's parameter should use the
protocol it's using now, or if it should accept something more like
"key" or "cmp" parameters of the builtin list.sort method.

Also, I attached a better version of the iterator patch that closes the
result object like the original code did, by hooking each iterator with
a call to result.close(). I couldn't figure out if this was actually
nessecary because it didn't seem consistent between Query and
ShardedQuery, but that may be for reasons I'm not seeing.

Feedback is welcome of course!

Kyle

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---

Use a generator to concatenate results from ShardedQuerys.

Adds a function `iter_concatenating` that accepts a list of iterators, and
yields every object produced by each of them. Alter ShardedQuery to use this
function instead of loading all of the potentially large sets of result objects
into a list before concatenating them.

Also adds a convenience function to hook StopIteration on an iterator.

diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py
--- a/lib/sqlalchemy/orm/shard.py
+++ b/lib/sqlalchemy/orm/shard.py
@@ -88,20 +88,16 @@
     def _execute_and_instances(self, context):
         if self._shard_id is not None:
             result = self.session.connection(mapper=self.mapper, shard_id=self._shard_id).execute(context.statement, **self._params)
-            try:
-                return iter(self.instances(result, querycontext=context))
-            finally:
-                result.close()
+            return util.hook_stop_iteration(self.iterate_instances(result, querycontext=context), lambda: result.close())
         else:
-            partial = []
+            partials = []
             for shard_id in self.query_chooser(self):
                 result = self.session.connection(mapper=self.mapper, shard_id=shard_id).execute(context.statement, **self._params)
-                try:
-                    partial = partial + list(self.instances(result, querycontext=context))
-                finally:
-                    result.close()
+                partials.append(util.hook_stop_iteration(self.iterate_instances(result, querycontext=context),
+                            lambda: result.close()))
+
             # if some kind of in memory 'sorting' were done, this is where it would happen
-            return iter(partial)
+            return util.iter_concatenating(partials)
 
     def get(self, ident, **kwargs):
         if self._shard_id is not None:
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -229,6 +229,30 @@
                 yield y
         else:
             yield elem
+
+def hook_stop_iteration(iterator, hook_fun):
+    """Add a hook to StopIteration on an iterator"""
+    try:
+        while True:
+            yield iterator.next()
+    except StopIteration:
+        hook_fun()
+        raise
+
+def iter_concatenating(iters):
+    """Concatenate iterables using a generator.
+
+    Yields every item produced by every iterable in the given list, in
+    concatenated order.
+    """
+    for iterable in iters:
+        try:
+            while True:
+                yield iterable.next()
+
+        # Begin yielding stuff off the next iterator.
+        except StopIteration:
+            pass
 
 class ArgSingleton(type):
     instances = weakref.WeakValueDictionary()
Add ability to do ordered merge of ShardedQuery results.

`iter_merging` creates an iterator that yields an ordered merge of the outputs
of the list of iterators passed in. The ordering is determined by a function
`ordered` which takes a list of objects and returns the member that is next in
the ordering.

`ShardedQuery.merge_ordering()` accepts such a function for sorting query
results and creates a ShardedQuery that will merge the results from each shard
using it.

diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py
--- a/lib/sqlalchemy/orm/shard.py
+++ b/lib/sqlalchemy/orm/shard.py
@@ -68,6 +68,7 @@
         self.id_chooser = self.session.id_chooser
         self.query_chooser = self.session.query_chooser
         self._shard_id = None
+        self._merge_ordering_fun = None
         
     def _clone(self):
         q = ShardedQuery.__new__(ShardedQuery)
@@ -97,7 +98,10 @@
                             lambda: result.close()))
 
             # if some kind of in memory 'sorting' were done, this is where it would happen
-            return util.iter_concatenating(partials)
+            if self._merge_ordering_fun:
+                return util.iter_merging(partials, self._merge_ordering_fun)
+            else:
+                return util.iter_concatenating(partials)
 
     def get(self, ident, **kwargs):
         if self._shard_id is not None:
@@ -121,3 +125,23 @@
                     return o
             else:
                 raise exceptions.InvalidRequestError("No instance found for identity %s" % repr(ident))
+    
+    def merge_ordering(self, ordering):
+        """Set an ordering function to merge the shard results.
+        
+        This function expects a callable that accepts a list of items being
+        returned from the query. The callable should return the member of the
+        list that comes first in merge order.
+        
+        Note that this will only enforce a complete ordering of the results if
+        the query itself contains an ``order_by`` that will order the results
+        from each shard the same way as the ordering function provided to
+        `merge_ordering`:
+
+        >>> q = query(Person).order_by([Person.age.asc()])
+        >>> mq = q.merge_ordering(lambda l: min(l, key=(lambda x: x.age)))
+        """
+
+        q = self._clone()
+        q._merge_ordering_fun = ordering
+        return q
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -253,6 +253,65 @@
         # Begin yielding stuff off the next iterator.
         except StopIteration:
             pass
+
+def iter_merging(iters, ordering=min):
+    """Merge iterables using a generator and an ordering function.
+
+    Yields every item produced by every iterable in the given list, ordered as
+    selected by the merge ordering function. The function always emits every
+    item from every iterable once, but the order of the output will only be
+    correct if the iterables are ordered by the same ordering used by the
+    merge.
+
+        ordering
+            A function accepting a list. The function should return the member
+            of this list that comes next in the merge ordering. Defaults to
+            built-in function min.
+    """
+
+    def next_with_stopped_flag(iter):
+        """Get the next thing coming from the iterator.
+        
+        Returns a tuple, containing either the item gotten with False (not
+        stopped), or None with True (iterator stopped).
+        """
+        try:
+            return iter.next(), False
+        except StopIteration:
+            return None, True
+
+    # I use iters_stopped as a list of flags indicating when each iterator
+    # ends, such that the domain of things that can be merge iter'ed does not
+    # exclude some sentinel value.
+
+    iter_heads = []    # Holds the last object generated by each iterator that
+                       # hasn't been merged yet; a staging area.
+    iters_stopped = [] # Flags indicating whether each iterator has stopped.
+
+    # Prime the head list
+    for iterable in iters:
+        head, stopped = next_with_stopped_flag(iterable)
+        iter_heads.append(head)
+        iters_stopped.append(stopped)
+
+    # Run until all iterators stop
+    while not min(iters_stopped):
+
+        # Gather a list of candidate values
+        candidate_values = [val for val, stopped
+                in zip(iter_heads, iters_stopped) if not stopped]
+
+        # Pick the next thing in the merge order and yield it
+        yielded_value = ordering(candidate_values)
+        yield yielded_value
+
+        for i in range(len(iters)):
+
+            # Whichever item in the head list got yielded, replace it with the
+            # next thing from it's iterator.
+            if yielded_value is iter_heads[i]:
+                iter_heads[i], iters_stopped[i] = next_with_stopped_flag(iters[i])
+                break
 
 class ArgSingleton(type):
     instances = weakref.WeakValueDictionary()
diff --git a/test/orm/sharding/shard.py b/test/orm/sharding/shard.py
--- a/test/orm/sharding/shard.py
+++ b/test/orm/sharding/shard.py
@@ -155,6 +155,14 @@
         asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia']))
         assert set([c.city for c in asia_and_europe]) == set(['Tokyo', 'London', 'Dublin'])
 
+        city_expected_order = sorted([o.city for o in [tokyo, newyork, toronto, london, dublin, brasilia, quito]])
+        base_query = sess.query(WeatherLocation).filter(WeatherLocation.continent != "Bogon")
+        merge_ordered = base_query.order_by([WeatherLocation.city]
+                ).merge_ordering(lambda objects: min(objects, key=(lambda o: o.city)))
+        assert [c.city for c in merge_ordered] == city_expected_order
+        merge_ordered_rev = base_query.order_by([WeatherLocation.city.desc()]
+                ).merge_ordering(lambda objects: max(objects, key=(lambda o: o.city)))
+        assert [c.city for c in merge_ordered_rev] == list(reversed(city_expected_order))
 
 
 if __name__ == '__main__':

Reply via email to