[ 
https://issues.apache.org/jira/browse/BEAM-4186?focusedWorklogId=98738&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98738
 ]

ASF GitHub Bot logged work on BEAM-4186:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/May/18 17:26
            Start Date: 05/May/18 17:26
    Worklog Time Spent: 10m 
      Work Description: fyellin closed pull request #5246: [BEAM-4186] Enable 
withQuerySplitter
URL: https://github.com/apache/beam/pull/5246
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 4d287dacd52..fd7efe3c855 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -286,6 +286,7 @@
     @Nullable public abstract ValueProvider<String> getNamespace();
     public abstract int getNumQuerySplits();
     @Nullable public abstract String getLocalhost();
+    @Nullable public abstract QuerySplitter getQuerySplitter();
 
     @Override
     public abstract String toString();
@@ -300,6 +301,7 @@
       abstract Builder setNamespace(ValueProvider<String> namespace);
       abstract Builder setNumQuerySplits(int numQuerySplits);
       abstract Builder setLocalhost(String localhost);
+      abstract Builder setQuerySplitter(QuerySplitter querySplitter);
       abstract Read build();
     }
 
@@ -505,6 +507,21 @@ private static Query translateGqlQuery(String gql, 
Datastore datastore, String n
       return toBuilder().setQuery(query).build();
     }
 
+    /**
+     * Returns a new {@link DatastoreV1.Read} that uses the specified {@link 
QuerySplitter} to
+     * split the query rather than the default one.
+     * The user-specified query splitter must be {@link Serializable}.
+     *
+     * <p><b>Note:</b>When the {@link Query} is configured with a limit using
+     * {@link Query.Builder#setLimit}, the query splitter will not be called, 
as decribed in
+     * {@link #withQuery(Query) withQuery}.
+     */
+    public DatastoreV1.Read withQuerySplitter(QuerySplitter querySplitter) {
+      checkArgument(querySplitter != null, "querySplitter can not be null");
+      checkArgument(querySplitter instanceof Serializable, "querySplitter must 
be serializable");
+      return toBuilder().setQuerySplitter(querySplitter).build();
+    }
+
     /**
      * Returns a new {@link DatastoreV1.Read} that reads the results of the 
specified GQL query.
      * See <a 
href="https://cloud.google.com/datastore/docs/reference/gql_reference";>GQL 
Reference
@@ -630,7 +647,8 @@ private static Query translateGqlQuery(String gql, 
Datastore datastore, String n
       }
 
       return inputQuery
-          .apply("Split", ParDo.of(new SplitQueryFn(v1Options, 
getNumQuerySplits())))
+          .apply("Split", ParDo.of(new SplitQueryFn(v1Options, 
getNumQuerySplits(),
+              getQuerySplitter())))
           .apply("Reshuffle", Reshuffle.viaRandomKey())
           .apply("Read", ParDo.of(new ReadFn(v1Options)));
     }
@@ -753,28 +771,34 @@ public void processElement(ProcessContext c) throws 
Exception {
       private final int numSplits;
 
       private final V1DatastoreFactory datastoreFactory;
+
+      // If not null, a query splitter to use instead of the default one.
+      private final QuerySplitter userQuerySplitter;
+
       // Datastore client
       private transient Datastore datastore;
       // Query splitter
       private transient QuerySplitter querySplitter;
 
-      public SplitQueryFn(V1Options options, int numSplits) {
-        this(options, numSplits, new V1DatastoreFactory());
+      public SplitQueryFn(V1Options options, int numSplits, QuerySplitter 
querySplitter) {
+        this(options, numSplits, querySplitter, new V1DatastoreFactory());
       }
 
       @VisibleForTesting
       SplitQueryFn(V1Options options, int numSplits,
-          V1DatastoreFactory datastoreFactory) {
+          @Nullable QuerySplitter querySplitter, V1DatastoreFactory 
datastoreFactory) {
         this.options = options;
         this.numSplits = numSplits;
         this.datastoreFactory = datastoreFactory;
+        this.userQuerySplitter = querySplitter;
       }
 
       @StartBundle
       public void startBundle(StartBundleContext c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
options.getProjectId(),
             options.getLocalhost());
-        querySplitter = datastoreFactory.getQuerySplitter();
+        querySplitter = userQuerySplitter != null
+            ? userQuerySplitter : datastoreFactory.getQuerySplitter();
       }
 
       @ProcessElement
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index fc3d2661487..c0c313978f7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -79,6 +79,7 @@ public void testGcpApiSurface() throws Exception {
             classesInPackage("com.google.cloud.spanner"),
             classesInPackage("com.google.datastore.v1"),
             classesInPackage("com.google.protobuf"),
+            classesInPackage("com.google.rpc"),
             classesInPackage("com.google.type"),
             classesInPackage("com.fasterxml.jackson.annotation"),
             classesInPackage("com.fasterxml.jackson.core"),
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 3a8a6a652d6..1de866af320 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -68,12 +68,14 @@
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
 import com.google.rpc.Code;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity;
@@ -609,7 +611,8 @@ public void testSplitQueryFnWithNumSplits() throws 
Exception {
         eq(QUERY), any(PartitionId.class), eq(numSplits), 
any(Datastore.class)))
         .thenReturn(splitQuery(QUERY, numSplits));
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, 
mockDatastoreFactory);
+    SplitQueryFn splitQueryFn =
+        new SplitQueryFn(V_1_OPTIONS, numSplits, null, mockDatastoreFactory);
     DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
     /**
      * Although Datastore client is marked transient in {@link SplitQueryFn}, 
when injected through
@@ -660,7 +663,8 @@ public void testSplitQueryFnWithoutNumSplits() throws 
Exception {
         eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), 
any(Datastore.class)))
         .thenReturn(splitQuery(QUERY, expectedNumSplits));
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, 
mockDatastoreFactory);
+    SplitQueryFn splitQueryFn =
+        new SplitQueryFn(V_1_OPTIONS, numSplits, null, mockDatastoreFactory);
     DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     List<Query> queries = doFnTester.processBundle(QUERY);
@@ -681,7 +685,7 @@ public void testSplitQueryFnWithQueryLimit() throws 
Exception {
         .setLimit(Int32Value.newBuilder().setValue(1))
         .build();
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, 
mockDatastoreFactory);
+    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, null, 
mockDatastoreFactory);
     DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     List<Query> queries = doFnTester.processBundle(queryWithLimit);
@@ -691,6 +695,47 @@ public void testSplitQueryFnWithQueryLimit() throws 
Exception {
     verifyNoMoreInteractions(mockQuerySplitter);
   }
 
+  /**
+   * Tests {@link SplitQueryFn} when an alternative Query Splitter is 
specified.
+   */
+  @Test
+  public void testSplitQueryFnWithAlternateSplitter() throws Exception {
+    AtomicInteger callCount = new AtomicInteger();
+    int numSplits = 100;
+
+    QuerySplitter fakeQuerySplitter = (QuerySplitter & Serializable)
+        (query, partition, shards, datastore) -> {
+           assertEquals(QUERY, query);
+           assertEquals(shards, numSplits);
+           callCount.incrementAndGet();
+           return splitQuery(query, shards);
+         };
+
+    SplitQueryFn splitQueryFn =
+        new SplitQueryFn(V_1_OPTIONS, numSplits, fakeQuerySplitter, 
mockDatastoreFactory);
+    DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+    /**
+     * Although Datastore client is marked transient in {@link SplitQueryFn}, 
when injected through
+     * mock factory using a when clause for unit testing purposes, it is not 
serializable
+     * because it doesn't have a no-arg constructor. Thus disabling the 
cloning to prevent the
+     * doFn from being serialized.
+     */
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<Query> queries = doFnTester.processBundle(QUERY);
+
+    assertEquals(queries.size(), numSplits);
+    assertEquals(callCount.get(),  1);
+
+    // Confirms that sub-queries are not equal to original when there is more 
than one split.
+    for (Query subQuery : queries) {
+      assertNotEquals(subQuery, QUERY);
+    }
+
+    verifyZeroInteractions(mockQuerySplitter);
+    verifyZeroInteractions(mockDatastore);
+  }
+
+
   /** Tests {@link ReadFn} with a query limit less than one batch. */
   @Test
   public void testReadFnWithOneBatch() throws Exception {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
index fa391cc338d..633e8566f32 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -85,7 +85,7 @@ private void testSplitQueryFn(String projectId, String kind, 
@Nullable String na
     query.addKindBuilder().setName(kind);
 
     SplitQueryFn splitQueryFn = new SplitQueryFn(
-        V1Options.from(projectId, namespace, null), 0);
+        V1Options.from(projectId, namespace, null), 0, null);
     DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
 
     List<Query> queries = doFnTester.processBundle(query.build());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 98738)
    Time Spent: 1h 10m  (was: 1h)

> Need to be able to set QuerySplitter in DatastoreIO.v1()
> --------------------------------------------------------
>
>                 Key: BEAM-4186
>                 URL: https://issues.apache.org/jira/browse/BEAM-4186
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>    Affects Versions: 2.4.0
>            Reporter: Frank Yellin
>            Assignee: Frank Yellin
>            Priority: Minor
>             Fix For: 2.4.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> I want to add a method
>       withQuerySplitter(QuerySplitter querySplitter)
> to DatastoreV1.Reader.  The implementation is fairly straightforward, except 
> for enforcing the requirement that the query splitter must be Serializable 
> for this to work.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to