Raise entity limit per RPC to 9MB.

This is closer to the API limit, while still leaving room for overhead. Brings
the Java SDK into line with the Python SDK.

Switch the unit test to use the size of each entity, which is what the
connector is actually using, rather than the property size (which is slightly
smaller and would cause the test to fail for some values).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/156f326a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/156f326a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/156f326a

Branch: refs/heads/gearpump-runner
Commit: 156f326a16e15b4e22a189a2a263d11d7b273656
Parents: fdfc70e
Author: Colin Phipps <fi...@google.com>
Authored: Mon Jun 5 12:12:49 2017 +0000
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 8 10:57:09 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java      |  2 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1Test.java  | 16 +++++++++-------
 2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index b198a6f..06b9c8a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -213,7 +213,7 @@ public class DatastoreV1 {
    * the mutations themselves and not the CommitRequest wrapper around them.
    */
   @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
+  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000;
 
   /**
    * Returns an empty {@link DatastoreV1.Read} builder. Configure the source 
{@code projectId},

http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 460049e..229b1fb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -651,12 +651,14 @@ public class DatastoreV1Test {
   @Test
   public void testDatatoreWriterFnWithLargeEntities() throws Exception {
     List<Mutation> mutations = new ArrayList<>();
-    int propertySize = 900_000;
+    int entitySize = 0;
     for (int i = 0; i < 12; ++i) {
-      Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i 
+ 1));
-      entity.putProperties("long", makeValue(new String(new char[propertySize])
-            ).setExcludeFromIndexes(true).build());
-      mutations.add(makeUpsert(entity.build()).build());
+      Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1))
+        .putProperties("long", makeValue(new String(new char[900_000])
+              ).setExcludeFromIndexes(true).build())
+        .build();
+      entitySize = entity.getSerializedSize(); // Take the size of any one 
entity.
+      mutations.add(makeUpsert(entity).build());
     }
 
     DatastoreWriterFn datastoreWriter = new 
DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
@@ -667,10 +669,10 @@ public class DatastoreV1Test {
 
     // This test is over-specific currently; it requires that we split the 12 
entity writes into 3
     // requests, but we only need each CommitRequest to be less than 10MB in 
size.
-    int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize;
+    int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
     int start = 0;
     while (start < mutations.size()) {
-      int end = Math.min(mutations.size(), start + propertiesPerRpc);
+      int end = Math.min(mutations.size(), start + entitiesPerRpc);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));

Reply via email to