Raise entity limit per RPC to 9MB. This is closer to the API limit, while still leaving room for overhead. Brings the Java SDK into line with the Python SDK.
Switch the unit test to use the size of each entity, which is what the connector is actually using, rather than the property size (which is slightly smaller and would cause the test to fail for some values). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/156f326a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/156f326a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/156f326a Branch: refs/heads/gearpump-runner Commit: 156f326a16e15b4e22a189a2a263d11d7b273656 Parents: fdfc70e Author: Colin Phipps <fi...@google.com> Authored: Mon Jun 5 12:12:49 2017 +0000 Committer: Ahmet Altay <al...@google.com> Committed: Thu Jun 8 10:57:09 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index b198a6f..06b9c8a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -213,7 +213,7 @@ public class DatastoreV1 { * the mutations themselves and not the CommitRequest wrapper around them. */ @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000; + static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000; /** * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId}, http://git-wip-us.apache.org/repos/asf/beam/blob/156f326a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 460049e..229b1fb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -651,12 +651,14 @@ public class DatastoreV1Test { @Test public void testDatatoreWriterFnWithLargeEntities() throws Exception { List<Mutation> mutations = new ArrayList<>(); - int propertySize = 900_000; + int entitySize = 0; for (int i = 0; i < 12; ++i) { - Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)); - entity.putProperties("long", makeValue(new String(new char[propertySize]) - ).setExcludeFromIndexes(true).build()); - mutations.add(makeUpsert(entity.build()).build()); + Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)) + .putProperties("long", makeValue(new String(new char[900_000]) + ).setExcludeFromIndexes(true).build()) + .build(); + entitySize = entity.getSerializedSize(); // Take the size of any one entity. + mutations.add(makeUpsert(entity).build()); } DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), @@ -667,10 +669,10 @@ public class DatastoreV1Test { // This test is over-specific currently; it requires that we split the 12 entity writes into 3 // requests, but we only need each CommitRequest to be less than 10MB in size. - int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize; + int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; int start = 0; while (start < mutations.size()) { - int end = Math.min(mutations.size(), start + propertiesPerRpc); + int end = Math.min(mutations.size(), start + entitiesPerRpc); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end));