[ https://issues.apache.org/jira/browse/BEAM-4186?focusedWorklogId=98738&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98738 ]
ASF GitHub Bot logged work on BEAM-4186: ---------------------------------------- Author: ASF GitHub Bot Created on: 05/May/18 17:26 Start Date: 05/May/18 17:26 Worklog Time Spent: 10m Work Description: fyellin closed pull request #5246: [BEAM-4186] Enable withQuerySplitter URL: https://github.com/apache/beam/pull/5246 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 4d287dacd52..fd7efe3c855 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -286,6 +286,7 @@ @Nullable public abstract ValueProvider<String> getNamespace(); public abstract int getNumQuerySplits(); @Nullable public abstract String getLocalhost(); + @Nullable public abstract QuerySplitter getQuerySplitter(); @Override public abstract String toString(); @@ -300,6 +301,7 @@ abstract Builder setNamespace(ValueProvider<String> namespace); abstract Builder setNumQuerySplits(int numQuerySplits); abstract Builder setLocalhost(String localhost); + abstract Builder setQuerySplitter(QuerySplitter querySplitter); abstract Read build(); } @@ -505,6 +507,21 @@ private static Query translateGqlQuery(String gql, Datastore datastore, String n return toBuilder().setQuery(query).build(); } + /** + * Returns a new {@link DatastoreV1.Read} that uses the specified {@link QuerySplitter} to + * split the query rather than the default one. + * The user-specified query splitter must be {@link Serializable}. + * + * <p><b>Note:</b>When the {@link Query} is configured with a limit using + * {@link Query.Builder#setLimit}, the query splitter will not be called, as decribed in + * {@link #withQuery(Query) withQuery}. + */ + public DatastoreV1.Read withQuerySplitter(QuerySplitter querySplitter) { + checkArgument(querySplitter != null, "querySplitter can not be null"); + checkArgument(querySplitter instanceof Serializable, "querySplitter must be serializable"); + return toBuilder().setQuerySplitter(querySplitter).build(); + } + /** * Returns a new {@link DatastoreV1.Read} that reads the results of the specified GQL query. * See <a href="https://cloud.google.com/datastore/docs/reference/gql_reference">GQL Reference @@ -630,7 +647,8 @@ private static Query translateGqlQuery(String gql, Datastore datastore, String n } return inputQuery - .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits()))) + .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits(), + getQuerySplitter()))) .apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read", ParDo.of(new ReadFn(v1Options))); } @@ -753,28 +771,34 @@ public void processElement(ProcessContext c) throws Exception { private final int numSplits; private final V1DatastoreFactory datastoreFactory; + + // If not null, a query splitter to use instead of the default one. + private final QuerySplitter userQuerySplitter; + // Datastore client private transient Datastore datastore; // Query splitter private transient QuerySplitter querySplitter; - public SplitQueryFn(V1Options options, int numSplits) { - this(options, numSplits, new V1DatastoreFactory()); + public SplitQueryFn(V1Options options, int numSplits, QuerySplitter querySplitter) { + this(options, numSplits, querySplitter, new V1DatastoreFactory()); } @VisibleForTesting SplitQueryFn(V1Options options, int numSplits, - V1DatastoreFactory datastoreFactory) { + @Nullable QuerySplitter querySplitter, V1DatastoreFactory datastoreFactory) { this.options = options; this.numSplits = numSplits; this.datastoreFactory = datastoreFactory; + this.userQuerySplitter = querySplitter; } @StartBundle public void startBundle(StartBundleContext c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), options.getLocalhost()); - querySplitter = datastoreFactory.getQuerySplitter(); + querySplitter = userQuerySplitter != null + ? userQuerySplitter : datastoreFactory.getQuerySplitter(); } @ProcessElement diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index fc3d2661487..c0c313978f7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -79,6 +79,7 @@ public void testGcpApiSurface() throws Exception { classesInPackage("com.google.cloud.spanner"), classesInPackage("com.google.datastore.v1"), classesInPackage("com.google.protobuf"), + classesInPackage("com.google.rpc"), classesInPackage("com.google.type"), classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("com.fasterxml.jackson.core"), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 3a8a6a652d6..1de866af320 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -68,12 +68,14 @@ import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; import com.google.rpc.Code; +import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity; @@ -609,7 +611,8 @@ public void testSplitQueryFnWithNumSplits() throws Exception { eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) .thenReturn(splitQuery(QUERY, numSplits)); - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); + SplitQueryFn splitQueryFn = + new SplitQueryFn(V_1_OPTIONS, numSplits, null, mockDatastoreFactory); DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn); /** * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through @@ -660,7 +663,8 @@ public void testSplitQueryFnWithoutNumSplits() throws Exception { eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) .thenReturn(splitQuery(QUERY, expectedNumSplits)); - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); + SplitQueryFn splitQueryFn = + new SplitQueryFn(V_1_OPTIONS, numSplits, null, mockDatastoreFactory); DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); List<Query> queries = doFnTester.processBundle(QUERY); @@ -681,7 +685,7 @@ public void testSplitQueryFnWithQueryLimit() throws Exception { .setLimit(Int32Value.newBuilder().setValue(1)) .build(); - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory); + SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, null, mockDatastoreFactory); DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); List<Query> queries = doFnTester.processBundle(queryWithLimit); @@ -691,6 +695,47 @@ public void testSplitQueryFnWithQueryLimit() throws Exception { verifyNoMoreInteractions(mockQuerySplitter); } + /** + * Tests {@link SplitQueryFn} when an alternative Query Splitter is specified. + */ + @Test + public void testSplitQueryFnWithAlternateSplitter() throws Exception { + AtomicInteger callCount = new AtomicInteger(); + int numSplits = 100; + + QuerySplitter fakeQuerySplitter = (QuerySplitter & Serializable) + (query, partition, shards, datastore) -> { + assertEquals(QUERY, query); + assertEquals(shards, numSplits); + callCount.incrementAndGet(); + return splitQuery(query, shards); + }; + + SplitQueryFn splitQueryFn = + new SplitQueryFn(V_1_OPTIONS, numSplits, fakeQuerySplitter, mockDatastoreFactory); + DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn); + /** + * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through + * mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * doFn from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<Query> queries = doFnTester.processBundle(QUERY); + + assertEquals(queries.size(), numSplits); + assertEquals(callCount.get(), 1); + + // Confirms that sub-queries are not equal to original when there is more than one split. + for (Query subQuery : queries) { + assertNotEquals(subQuery, QUERY); + } + + verifyZeroInteractions(mockQuerySplitter); + verifyZeroInteractions(mockDatastore); + } + + /** Tests {@link ReadFn} with a query limit less than one batch. */ @Test public void testReadFnWithOneBatch() throws Exception { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java index fa391cc338d..633e8566f32 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -85,7 +85,7 @@ private void testSplitQueryFn(String projectId, String kind, @Nullable String na query.addKindBuilder().setName(kind); SplitQueryFn splitQueryFn = new SplitQueryFn( - V1Options.from(projectId, namespace, null), 0); + V1Options.from(projectId, namespace, null), 0, null); DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn); List<Query> queries = doFnTester.processBundle(query.build()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 98738) Time Spent: 1h 10m (was: 1h) > Need to be able to set QuerySplitter in DatastoreIO.v1() > -------------------------------------------------------- > > Key: BEAM-4186 > URL: https://issues.apache.org/jira/browse/BEAM-4186 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp > Affects Versions: 2.4.0 > Reporter: Frank Yellin > Assignee: Frank Yellin > Priority: Minor > Fix For: 2.4.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > I want to add a method > withQuerySplitter(QuerySplitter querySplitter) > to DatastoreV1.Reader. The implementation is fairly straightforward, except > for enforcing the requirement that the query splitter must be Serializable > for this to work. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)