On Sun, 27 Apr 2008 23:35:06 -0400 Kyle Schaffrick <[EMAIL PROTECTED]> wrote:
> Since as I mentioned, this is kind of an interesting learning project > for me to learn some SA internals, I'd like my next step to be seeing > if I can write something that will make a callable/closure which, when > passed into iter_merging, will produce the least surprising ordering > of results w/r/t what was requested in the original ShardedQuery. I have taken a first stab at this to play with the idea and it seems to be doing the right thing, the patch also contains some very rudimentary tests in ShardTest that make it voom. The patch adds a .merge_ordering() method to ShardedQuery that allows supplying an ordering function that will take a short (upper limit being the number of shards involved) list of items and return the one that comes first. The type of the items is whatever you would get back from the query ordinarily. If you use it with an order_by that enforces the same ordering, you can get a complete ordering of results for queries that pull results from multiple shards. Example: >>> q = s.query(Person).order_by(Person.age.asc()) >>> mq = q.merge_ordering(lambda l: min(l, key=(lambda p: p.age))) If you don't call merge_ordering, it doesn't do any merging and uses the standard behavior of concatenating the results. "merge_ordering" is probably a crappy name, so if this gets any interest then we can figure out what else to call it. One aspect I'm not sure about is whether or not merge_ordering's parameter should use the protocol it's using now, or if it should accept something more like "key" or "cmp" parameters of the builtin list.sort method. Also, I attached a better version of the iterator patch that closes the result object like the original code did, by hooking each iterator with a call to result.close(). I couldn't figure out if this was actually nessecary because it didn't seem consistent between Query and ShardedQuery, but that may be for reasons I'm not seeing. Feedback is welcome of course! Kyle --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~----------~----~----~----~------~----~------~--~---
Use a generator to concatenate results from ShardedQuerys. Adds a function `iter_concatenating` that accepts a list of iterators, and yields every object produced by each of them. Alter ShardedQuery to use this function instead of loading all of the potentially large sets of result objects into a list before concatenating them. Also adds a convenience function to hook StopIteration on an iterator. diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py --- a/lib/sqlalchemy/orm/shard.py +++ b/lib/sqlalchemy/orm/shard.py @@ -88,20 +88,16 @@ def _execute_and_instances(self, context): if self._shard_id is not None: result = self.session.connection(mapper=self.mapper, shard_id=self._shard_id).execute(context.statement, **self._params) - try: - return iter(self.instances(result, querycontext=context)) - finally: - result.close() + return util.hook_stop_iteration(self.iterate_instances(result, querycontext=context), lambda: result.close()) else: - partial = [] + partials = [] for shard_id in self.query_chooser(self): result = self.session.connection(mapper=self.mapper, shard_id=shard_id).execute(context.statement, **self._params) - try: - partial = partial + list(self.instances(result, querycontext=context)) - finally: - result.close() + partials.append(util.hook_stop_iteration(self.iterate_instances(result, querycontext=context), + lambda: result.close())) + # if some kind of in memory 'sorting' were done, this is where it would happen - return iter(partial) + return util.iter_concatenating(partials) def get(self, ident, **kwargs): if self._shard_id is not None: diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py --- a/lib/sqlalchemy/util.py +++ b/lib/sqlalchemy/util.py @@ -229,6 +229,30 @@ yield y else: yield elem + +def hook_stop_iteration(iterator, hook_fun): + """Add a hook to StopIteration on an iterator""" + try: + while True: + yield iterator.next() + except StopIteration: + hook_fun() + raise + +def iter_concatenating(iters): + """Concatenate iterables using a generator. + + Yields every item produced by every iterable in the given list, in + concatenated order. + """ + for iterable in iters: + try: + while True: + yield iterable.next() + + # Begin yielding stuff off the next iterator. + except StopIteration: + pass class ArgSingleton(type): instances = weakref.WeakValueDictionary()
Add ability to do ordered merge of ShardedQuery results. `iter_merging` creates an iterator that yields an ordered merge of the outputs of the list of iterators passed in. The ordering is determined by a function `ordered` which takes a list of objects and returns the member that is next in the ordering. `ShardedQuery.merge_ordering()` accepts such a function for sorting query results and creates a ShardedQuery that will merge the results from each shard using it. diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py --- a/lib/sqlalchemy/orm/shard.py +++ b/lib/sqlalchemy/orm/shard.py @@ -68,6 +68,7 @@ self.id_chooser = self.session.id_chooser self.query_chooser = self.session.query_chooser self._shard_id = None + self._merge_ordering_fun = None def _clone(self): q = ShardedQuery.__new__(ShardedQuery) @@ -97,7 +98,10 @@ lambda: result.close())) # if some kind of in memory 'sorting' were done, this is where it would happen - return util.iter_concatenating(partials) + if self._merge_ordering_fun: + return util.iter_merging(partials, self._merge_ordering_fun) + else: + return util.iter_concatenating(partials) def get(self, ident, **kwargs): if self._shard_id is not None: @@ -121,3 +125,23 @@ return o else: raise exceptions.InvalidRequestError("No instance found for identity %s" % repr(ident)) + + def merge_ordering(self, ordering): + """Set an ordering function to merge the shard results. + + This function expects a callable that accepts a list of items being + returned from the query. The callable should return the member of the + list that comes first in merge order. + + Note that this will only enforce a complete ordering of the results if + the query itself contains an ``order_by`` that will order the results + from each shard the same way as the ordering function provided to + `merge_ordering`: + + >>> q = query(Person).order_by([Person.age.asc()]) + >>> mq = q.merge_ordering(lambda l: min(l, key=(lambda x: x.age))) + """ + + q = self._clone() + q._merge_ordering_fun = ordering + return q diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py --- a/lib/sqlalchemy/util.py +++ b/lib/sqlalchemy/util.py @@ -253,6 +253,65 @@ # Begin yielding stuff off the next iterator. except StopIteration: pass + +def iter_merging(iters, ordering=min): + """Merge iterables using a generator and an ordering function. + + Yields every item produced by every iterable in the given list, ordered as + selected by the merge ordering function. The function always emits every + item from every iterable once, but the order of the output will only be + correct if the iterables are ordered by the same ordering used by the + merge. + + ordering + A function accepting a list. The function should return the member + of this list that comes next in the merge ordering. Defaults to + built-in function min. + """ + + def next_with_stopped_flag(iter): + """Get the next thing coming from the iterator. + + Returns a tuple, containing either the item gotten with False (not + stopped), or None with True (iterator stopped). + """ + try: + return iter.next(), False + except StopIteration: + return None, True + + # I use iters_stopped as a list of flags indicating when each iterator + # ends, such that the domain of things that can be merge iter'ed does not + # exclude some sentinel value. + + iter_heads = [] # Holds the last object generated by each iterator that + # hasn't been merged yet; a staging area. + iters_stopped = [] # Flags indicating whether each iterator has stopped. + + # Prime the head list + for iterable in iters: + head, stopped = next_with_stopped_flag(iterable) + iter_heads.append(head) + iters_stopped.append(stopped) + + # Run until all iterators stop + while not min(iters_stopped): + + # Gather a list of candidate values + candidate_values = [val for val, stopped + in zip(iter_heads, iters_stopped) if not stopped] + + # Pick the next thing in the merge order and yield it + yielded_value = ordering(candidate_values) + yield yielded_value + + for i in range(len(iters)): + + # Whichever item in the head list got yielded, replace it with the + # next thing from it's iterator. + if yielded_value is iter_heads[i]: + iter_heads[i], iters_stopped[i] = next_with_stopped_flag(iters[i]) + break class ArgSingleton(type): instances = weakref.WeakValueDictionary() diff --git a/test/orm/sharding/shard.py b/test/orm/sharding/shard.py --- a/test/orm/sharding/shard.py +++ b/test/orm/sharding/shard.py @@ -155,6 +155,14 @@ asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia'])) assert set([c.city for c in asia_and_europe]) == set(['Tokyo', 'London', 'Dublin']) + city_expected_order = sorted([o.city for o in [tokyo, newyork, toronto, london, dublin, brasilia, quito]]) + base_query = sess.query(WeatherLocation).filter(WeatherLocation.continent != "Bogon") + merge_ordered = base_query.order_by([WeatherLocation.city] + ).merge_ordering(lambda objects: min(objects, key=(lambda o: o.city))) + assert [c.city for c in merge_ordered] == city_expected_order + merge_ordered_rev = base_query.order_by([WeatherLocation.city.desc()] + ).merge_ordering(lambda objects: max(objects, key=(lambda o: o.city))) + assert [c.city for c in merge_ordered_rev] == list(reversed(city_expected_order)) if __name__ == '__main__':