Repository: incubator-beam Updated Branches: refs/heads/python-sdk d1fccbf5e -> 21b7844bb
Query Splitter for Datastore v1 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70 Branch: refs/heads/python-sdk Commit: c1126b708469fc63bd8ab8e54026700408ec34da Parents: d1fccbf Author: Vikas Kedigehalli <vika...@google.com> Authored: Mon Oct 24 18:29:29 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Nov 15 14:24:02 2016 -0800 ---------------------------------------------------------------------- .../python/apache_beam/io/datastore/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/helper.py | 84 ++++++ .../apache_beam/io/datastore/v1/helper_test.py | 124 +++++++++ .../io/datastore/v1/query_splitter.py | 270 +++++++++++++++++++ .../io/datastore/v1/query_splitter_test.py | 257 ++++++++++++++++++ sdks/python/setup.py | 1 + 7 files changed, 768 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py new file mode 100644 index 0000000..626ab35 --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/helper.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Cloud Datastore helper functions.""" + + +def key_comparator(k1, k2): + """A comparator for Datastore keys. + + Comparison is only valid for keys in the same partition. The comparison here + is between the list of paths for each key. + """ + + if k1.partition_id != k2.partition_id: + raise ValueError('Cannot compare keys with different partition ids.') + + k2_iter = iter(k2.path) + + for k1_path in k1.path: + k2_path = next(k2_iter, None) + if not k2_path: + return 1 + + result = compare_path(k1_path, k2_path) + + if result != 0: + return result + + k2_path = next(k2_iter, None) + if k2_path: + return -1 + else: + return 0 + + +def compare_path(p1, p2): + """A comparator for key path. + + A path has either an `id` or a `name` field defined. The + comparison works with the following rules: + + 1. If one path has `id` defined while the other doesn't, then the + one with `id` defined is considered smaller. + 2. If both paths have `id` defined, then their ids are compared. + 3. If no `id` is defined for both paths, then their `names` are compared. + """ + + result = str_compare(p1.kind, p2.kind) + if result != 0: + return result + + if p1.HasField('id'): + if not p2.HasField('id'): + return -1 + + return p1.id - p2.id + + if p2.HasField('id'): + return 1 + + return str_compare(p1.name, p2.name) + + +def str_compare(s1, s2): + if s1 == s2: + return 0 + elif s1 < s2: + return -1 + else: + return 1 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py new file mode 100644 index 0000000..50f8e4c --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for datastore helper.""" + +import unittest +from apache_beam.io.datastore.v1 import helper +from google.datastore.v1.entity_pb2 import Key + + +class HelperTest(unittest.TestCase): + + def test_compare_path_with_different_kind(self): + p1 = Key.PathElement() + p1.kind = 'dummy1' + + p2 = Key.PathElement() + p2.kind = 'dummy2' + + self.assertLess(helper.compare_path(p1, p2), 0) + + def test_compare_path_with_different_id(self): + p1 = Key.PathElement() + p1.kind = 'dummy' + p1.id = 10 + + p2 = Key.PathElement() + p2.kind = 'dummy' + p2.id = 15 + + self.assertLess(helper.compare_path(p1, p2), 0) + + def test_compare_path_with_different_name(self): + p1 = Key.PathElement() + p1.kind = 'dummy' + p1.name = "dummy1" + + p2 = Key.PathElement() + p2.kind = 'dummy' + p2.name = 'dummy2' + + self.assertLess(helper.compare_path(p1, p2), 0) + + def test_compare_path_of_different_type(self): + p1 = Key.PathElement() + p1.kind = 'dummy' + p1.id = 10 + + p2 = Key.PathElement() + p2.kind = 'dummy' + p2.name = 'dummy' + + self.assertLess(helper.compare_path(p1, p2), 0) + + def test_key_comparator_with_different_partition(self): + k1 = Key() + k1.partition_id.namespace_id = 'dummy1' + k2 = Key() + k2.partition_id.namespace_id = 'dummy2' + self.assertRaises(ValueError, helper.key_comparator, k1, k2) + + def test_key_comparator_with_single_path(self): + k1 = Key() + k2 = Key() + p1 = k1.path.add() + p2 = k2.path.add() + p1.kind = p2.kind = 'dummy' + self.assertEqual(helper.key_comparator(k1, k2), 0) + + def test_key_comparator_with_multiple_paths_1(self): + k1 = Key() + k2 = Key() + p11 = k1.path.add() + p12 = k1.path.add() + p21 = k2.path.add() + p11.kind = p12.kind = p21.kind = 'dummy' + self.assertGreater(helper.key_comparator(k1, k2), 0) + + def test_key_comparator_with_multiple_paths_2(self): + k1 = Key() + k2 = Key() + p11 = k1.path.add() + p21 = k2.path.add() + p22 = k2.path.add() + p11.kind = p21.kind = p22.kind = 'dummy' + self.assertLess(helper.key_comparator(k1, k2), 0) + + def test_key_comparator_with_multiple_paths_3(self): + k1 = Key() + k2 = Key() + p11 = k1.path.add() + p12 = k1.path.add() + p21 = k2.path.add() + p22 = k2.path.add() + p11.kind = p12.kind = p21.kind = p22.kind = 'dummy' + self.assertEqual(helper.key_comparator(k1, k2), 0) + + def test_key_comparator_with_multiple_paths_4(self): + k1 = Key() + k2 = Key() + p11 = k1.path.add() + p12 = k2.path.add() + p21 = k2.path.add() + p11.kind = p12.kind = 'dummy' + # make path2 greater than path1 + p21.kind = 'dummy1' + self.assertLess(helper.key_comparator(k1, k2), 0) + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py new file mode 100644 index 0000000..82aa972 --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py @@ -0,0 +1,270 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Implements a Cloud Datastore query splitter.""" + +from apache_beam.io.datastore.v1 import helper +from google.datastore.v1 import datastore_pb2 +from google.datastore.v1 import query_pb2 +from google.datastore.v1.query_pb2 import PropertyFilter +from google.datastore.v1.query_pb2 import CompositeFilter +import googledatastore + + +__all__ = [ + 'get_splits', +] + +SCATTER_PROPERTY_NAME = '__scatter__' +KEY_PROPERTY_NAME = '__key__' +# The number of keys to sample for each split. +KEYS_PER_SPLIT = 32 + +UNSUPPORTED_OPERATORS = [PropertyFilter.LESS_THAN, + PropertyFilter.LESS_THAN_OR_EQUAL, + PropertyFilter.GREATER_THAN, + PropertyFilter.GREATER_THAN_OR_EQUAL] + + +def get_splits(datastore, query, num_splits, partition=None): + """Returns a list of sharded queries for the given Cloud Datastore query. + + This will create up to the desired number of splits, however it may return + less splits if the desired number of splits is unavailable. This will happen + if the number of split points provided by the underlying Datastore is less + than the desired number, which will occur if the number of results for the + query is too small. + + This implementation of the QuerySplitter uses the __scatter__ property to + gather random split points for a query. + + Note: This implementation is derived from the java query splitter in + https://github.com/GoogleCloudPlatform/google-cloud-datastore/blob/master/java/datastore/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java + + Args: + datastore: the datastore client. + query: the query to split. + num_splits: the desired number of splits. + partition: the partition the query is running in. + + Returns: + A list of split queries, of a max length of `num_splits` + """ + + # Validate that the number of splits is not out of bounds. + if num_splits < 1: + raise ValueError('The number of splits must be greater than 0.') + + if num_splits == 1: + return [query] + + _validate_query(query) + + splits = [] + scatter_keys = _get_scatter_keys(datastore, query, num_splits, partition) + last_key = None + for next_key in _get_split_key(scatter_keys, num_splits): + splits.append(_create_split(last_key, next_key, query)) + last_key = next_key + + splits.append(_create_split(last_key, None, query)) + return splits + + +def _validate_query(query): + """ Verifies that the given query can be properly scattered.""" + + if len(query.kind) != 1: + raise ValueError('Query must have exactly one kind.') + + if len(query.order) != 0: + raise ValueError('Query cannot have any sort orders.') + + if query.HasField('limit'): + raise ValueError('Query cannot have a limit set.') + + if query.offset > 0: + raise ValueError('Query cannot have an offset set.') + + _validate_filter(query.filter) + + +def _validate_filter(filter): + """Validates that we only have allowable filters. + + Note that equality and ancestor filters are allowed, however they may result + in inefficient sharding. + """ + + if filter.HasField('composite_filter'): + for sub_filter in filter.composite_filter.filters: + _validate_filter(sub_filter) + elif filter.HasField('property_filter'): + if filter.property_filter.op in UNSUPPORTED_OPERATORS: + raise ValueError('Query cannot have any inequality filters.') + else: + pass + + +def _create_scatter_query(query, num_splits): + """Creates a scatter query from the given user query.""" + + scatter_query = query_pb2.Query() + for kind in query.kind: + scatter_kind = scatter_query.kind.add() + scatter_kind.CopyFrom(kind) + + # ascending order + googledatastore.helper.add_property_orders(scatter_query, + SCATTER_PROPERTY_NAME) + + # There is a split containing entities before and after each scatter entity: + # ||---*------*------*------*------*------*------*---|| * = scatter entity + # If we represent each split as a region before a scatter entity, there is an + # extra region following the last scatter point. Thus, we do not need the + # scatter entity for the last region. + scatter_query.limit.value = (num_splits - 1) * KEYS_PER_SPLIT + googledatastore.helper.add_projection(scatter_query, KEY_PROPERTY_NAME) + + return scatter_query + + +def _get_scatter_keys(datastore, query, num_splits, partition): + """Gets a list of split keys given a desired number of splits. + + This list will contain multiple split keys for each split. Only a single split + key will be chosen as the split point, however providing multiple keys allows + for more uniform sharding. + + Args: + numSplits: the number of desired splits. + query: the user query. + partition: the partition to run the query in. + datastore: the client to datastore containing the data. + + Returns: + A list of scatter keys returned by Datastore. + """ + scatter_point_query = _create_scatter_query(query, num_splits) + + key_splits = [] + while True: + req = datastore_pb2.RunQueryRequest() + if partition: + req.partition_id.CopyFrom(partition) + + req.query.CopyFrom(scatter_point_query) + + resp = datastore.run_query(req) + for entity_result in resp.batch.entity_results: + key_splits.append(entity_result.entity.key) + + if resp.batch.more_results != query_pb2.QueryResultBatch.NOT_FINISHED: + break + + scatter_point_query.start_cursor = resp.batch.end_cursor + scatter_point_query.limit.value -= len(resp.batch.entity_results) + + key_splits.sort(helper.key_comparator) + return key_splits + + +def _get_split_key(keys, num_splits): + """Given a list of keys and a number of splits find the keys to split on. + + Args: + keys: the list of keys. + num_splits: the number of splits. + + Returns: + A list of keys to split on. + + """ + + # If the number of keys is less than the number of splits, we are limited + # in the number of splits we can make. + if not keys or (len(keys) < (num_splits - 1)): + return keys + + # Calculate the number of keys per split. This should be KEYS_PER_SPLIT, + # but may be less if there are not KEYS_PER_SPLIT * (numSplits - 1) scatter + # entities. + # + # Consider the following dataset, where - represents an entity and + # * represents an entity that is returned as a scatter entity: + # ||---*-----*----*-----*-----*------*----*----|| + # If we want 4 splits in this data, the optimal split would look like: + # ||---*-----*----*-----*-----*------*----*----|| + # | | | + # The scatter keys in the last region are not useful to us, so we never + # request them: + # ||---*-----*----*-----*-----*------*---------|| + # | | | + # With 6 scatter keys we want to set scatter points at indexes: 1, 3, 5. + # + # We keep this as a float so that any "fractional" keys per split get + # distributed throughout the splits and don't make the last split + # significantly larger than the rest. + + num_keys_per_split = max(1.0, float(len(keys)) / (num_splits - 1)) + + split_keys = [] + + # Grab the last sample for each split, otherwise the first split will be too + # small. + for i in range(1, num_splits): + split_index = int(round(i * num_keys_per_split) - 1) + split_keys.append(keys[split_index]) + + return split_keys + + +def _create_split(last_key, next_key, query): + """Create a new {@link Query} given the query and range.. + + Args: + last_key: the previous key. If null then assumed to be the beginning. + next_key: the next key. If null then assumed to be the end. + query: the desired query. + + Returns: + A split query with fetches entities in the range [last_key, next_key) + """ + if not (last_key or next_key): + return query + + split_query = query_pb2.Query() + split_query.CopyFrom(query) + composite_filter = split_query.filter.composite_filter + composite_filter.op = CompositeFilter.AND + + if query.HasField('filter'): + composite_filter.filters.add().CopyFrom(query.filter) + + if last_key: + lower_bound = composite_filter.filters.add() + lower_bound.property_filter.property.name = KEY_PROPERTY_NAME + lower_bound.property_filter.op = PropertyFilter.GREATER_THAN_OR_EQUAL + lower_bound.property_filter.value.key_value.CopyFrom(last_key) + + if next_key: + upper_bound = composite_filter.filters.add() + upper_bound.property_filter.property.name = KEY_PROPERTY_NAME + upper_bound.property_filter.op = PropertyFilter.LESS_THAN + upper_bound.property_filter.value.key_value.CopyFrom(next_key) + + return split_query http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py new file mode 100644 index 0000000..979a69f --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py @@ -0,0 +1,257 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Cloud Datastore query splitter test.""" + +import unittest +import uuid + +from mock import MagicMock +from mock import call + +from apache_beam.io.datastore.v1 import query_splitter + +from google.datastore.v1 import datastore_pb2 +from google.datastore.v1 import query_pb2 +from google.datastore.v1.query_pb2 import PropertyFilter + + +class QuerySplitterTest(unittest.TestCase): + + def test_get_splits_query_with_multiple_kinds(self): + query = query_pb2.Query() + query.kind.add() + query.kind.add() + self.assertRaises(ValueError, query_splitter.get_splits, None, query, 4) + + def test_get_splits_query_with_order(self): + query = query_pb2.Query() + query.kind.add() + query.order.add() + + self.assertRaises(ValueError, query_splitter.get_splits, None, query, 3) + + def test_get_splits_query_with_unsupported_filter(self): + query = query_pb2.Query() + query.kind.add() + test_filter = query.filter.composite_filter.filters.add() + test_filter.property_filter.op = PropertyFilter.GREATER_THAN + self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2) + + def test_get_splits_query_with_limit(self): + query = query_pb2.Query() + query.kind.add() + query.limit.value = 10 + self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2) + + def test_get_splits_query_with_offset(self): + query = query_pb2.Query() + query.kind.add() + query.offset = 10 + self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2) + + def test_create_scatter_query(self): + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 10 + scatter_query = query_splitter._create_scatter_query(query, num_splits) + self.assertEqual(scatter_query.kind[0], kind) + self.assertEqual(scatter_query.limit.value, + (num_splits -1) * query_splitter.KEYS_PER_SPLIT) + self.assertEqual(scatter_query.order[0].direction, + query_pb2.PropertyOrder.ASCENDING) + self.assertEqual(scatter_query.projection[0].property.name, + query_splitter.KEY_PROPERTY_NAME) + + def test_get_splits_with_two_splits(self): + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 2 + num_entities = 97 + batch_size = 9 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def test_get_splits_with_multiple_splits(self): + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 4 + num_entities = 369 + batch_size = 12 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def test_get_splits_with_large_num_splits(self): + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 10 + num_entities = 4 + batch_size = 10 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def test_get_splits_with_small_num_entities(self): + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 4 + num_entities = 50 + batch_size = 10 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def test_get_splits_with_batch_size_exact_multiple(self): + """Test get_splits when num scatter keys is a multiple of batch size.""" + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 4 + num_entities = 400 + batch_size = 32 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def test_get_splits_with_large_batch_size(self): + """Test get_splits when all scatter keys are retured in a single req.""" + query = query_pb2.Query() + kind = query.kind.add() + kind.name = 'shakespeare-demo' + num_splits = 4 + num_entities = 400 + batch_size = 500 + + self.check_get_splits(query, num_splits, num_entities, batch_size) + + def check_get_splits(self, query, num_splits, num_entities, batch_size): + """A helper method to test the query_splitter get_splits method. + + Args: + query: the query to be split + num_splits: number of splits + num_entities: number of scatter entities contained in the fake datastore. + batch_size: the number of entities returned by fake datastore in one req. + """ + + entities = QuerySplitterTest.create_entities(num_entities) + mock_datastore = MagicMock() + # Assign a fake run_query method as a side_effect to the mock. + mock_datastore.run_query.side_effect = \ + QuerySplitterTest.create_run_query(entities, batch_size) + + split_queries = query_splitter.get_splits(mock_datastore, query, num_splits) + + # if request num_splits is greater than num_entities, the best it can + # do is one entity per split. + expected_num_splits = min(num_splits, num_entities + 1) + self.assertEqual(len(split_queries), expected_num_splits) + + expected_requests = QuerySplitterTest.create_scatter_requests( + query, num_splits, batch_size, num_entities) + + expected_calls = [] + for req in expected_requests: + expected_calls.append(call(req)) + + self.assertEqual(expected_calls, mock_datastore.run_query.call_args_list) + + @staticmethod + def create_run_query(entities, batch_size): + """A fake datastore run_query method that returns entities in batches. + + Note: the outer method is needed to make the `entities` and `batch_size` + available in the scope of fake_run_query method. + + Args: + entities: list of entities supposed to be contained in the datastore. + batch_size: the number of entities that run_query method returns in one + request. + """ + def fake_run_query(req): + start = int(req.query.start_cursor) if req.query.start_cursor else 0 + # if query limit is less than batch_size, then only return that much. + count = min(batch_size, req.query.limit.value) + # cannot go more than the number of entities contained in datastore. + end = min(len(entities), start + count) + finish = False + # Finish reading when there are no more entities to return, + # or request query limit has been satisfied. + if end == len(entities) or count == req.query.limit.value: + finish = True + return QuerySplitterTest.create_scatter_response(entities[start:end], + str(end), finish) + return fake_run_query + + @staticmethod + def create_scatter_requests(query, num_splits, batch_size, num_entities): + """Creates a list of expected scatter requests from the query splitter. + + This list of requests returned is used to verify that the query splitter + made the same number of requests in the same order to datastore. + """ + + requests = [] + count = (num_splits - 1) * query_splitter.KEYS_PER_SPLIT + start_cursor = '' + i = 0 + scatter_query = query_splitter._create_scatter_query(query, count) + while i < count and i < num_entities: + request = datastore_pb2.RunQueryRequest() + request.query.CopyFrom(scatter_query) + request.query.start_cursor = start_cursor + request.query.limit.value = count - i + requests.append(request) + i += batch_size + start_cursor = str(i) + + return requests + + @staticmethod + def create_scatter_response(entities, end_cursor, finish): + """Creates a query response for a given batch of scatter entities.""" + + resp = datastore_pb2.RunQueryResponse() + if finish: + resp.batch.more_results = query_pb2.QueryResultBatch.NO_MORE_RESULTS + else: + resp.batch.more_results = query_pb2.QueryResultBatch.NOT_FINISHED + + resp.batch.end_cursor = end_cursor + for entity_result in entities: + resp.batch.entity_results.add().CopyFrom(entity_result) + + return resp + + @staticmethod + def create_entities(count): + """Creates a list of entities with random keys.""" + + entities = [] + + for _ in range(0, count): + entity_result = query_pb2.EntityResult() + entity_result.entity.key.path.add().name = str(uuid.uuid4()) + entities.append(entity_result) + + return entities + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c7b940d..af59069 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', 'google-apitools>=0.5.2,<1.0.0', + 'googledatastore==6.3.0', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0', 'oauth2client>=2.0.1,<4.0.0',