Repository: beam Updated Branches: refs/heads/master 296f7fa96 -> 75bab74eb
Fixes a bug in query splitting. We were returning original query instead of the sub-queries resulting in data duplication when reading. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/999e957d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/999e957d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/999e957d Branch: refs/heads/master Commit: 999e957dddd9bf826e2df912719d2f0e3ad8f5c5 Parents: 296f7fa Author: chamik...@google.com <chamik...@google.com> Authored: Sun Oct 8 17:02:43 2017 -0700 Committer: Chamikara Jayalath <chamik...@google.com> Committed: Mon Oct 9 14:01:16 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/999e957d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index eb0c26f..9b20c0d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -807,7 +807,7 @@ public class DatastoreV1 { // assign unique keys to query splits. for (Query subquery : querySplits) { - c.output(query); + c.output(subquery); } } http://git-wip-us.apache.org/repos/asf/beam/blob/999e957d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 58bab21..550b6b9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -622,6 +623,12 @@ public class DatastoreV1Test { List<Query> queries = doFnTester.processBundle(QUERY); assertEquals(queries.size(), numSplits); + + // Confirms that sub-queries are not equal to original when there is more than one split. + for (Query subQuery : queries) { + assertNotEquals(subQuery, QUERY); + } + verify(mockQuerySplitter, times(1)).getSplits( eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); verifyZeroInteractions(mockDatastore); @@ -991,8 +998,14 @@ public class DatastoreV1Test { /** Generate dummy query splits. */ private List<Query> splitQuery(Query query, int numSplits) { List<Query> queries = new LinkedList<>(); + int offsetOfOriginal = query.getOffset(); for (int i = 0; i < numSplits; i++) { - queries.add(query.toBuilder().build()); + Query.Builder q = Query.newBuilder(); + q.addKindBuilder().setName(KIND); + // Making sub-queries unique (and not equal to the original query) by setting different + // offsets. + q.setOffset(++offsetOfOriginal); + queries.add(q.build()); } return queries; }