[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=151205&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151205 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 04/Oct/18 14:48 Start Date: 04/Oct/18 14:48 Worklog Time Spent: 10m Work Description: nithinsujir commented on issue #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#issuecomment-427047885 LGTM. I suggest we wait for @biswanag comments as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 151205) Time Spent: 3h (was: 2h 50m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150412 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 02/Oct/18 15:15 Start Date: 02/Oct/18 15:15 Worklog Time Spent: 10m Work Description: nielm commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221993914 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -1074,59 +1061,61 @@ public void setup() { batch = ImmutableList.builder(); batchSizeBytes = 0; batchCells = 0; - spannerAccessor = spannerConfig.connectToSpanner(); -} - -@Teardown -public void teardown() { - spannerAccessor.close(); } @ProcessElement public void processElement(ProcessContext c) throws Exception { SpannerSchema spannerSchema = c.sideInput(schemaView); - MutationGroupEncoder mutationGroupEncoder = new MutationGroupEncoder(spannerSchema); - KV> element = c.element(); - for (SerializedMutation kv : element.getValue()) { -byte[] value = kv.getMutationGroupBytes(); -MutationGroup mg = mutationGroupEncoder.decode(value); + // Iterate through list, outputting whenever a batch is complete. + for (KV kv : c.element()) { +MutationGroup mg = decode(kv.getValue()); + long groupSize = MutationSizeEstimator.sizeOf(mg); long groupCells = MutationCellCounter.countOf(spannerSchema, mg); -if (batchCells + groupCells > maxNumMutations -|| batchSizeBytes + groupSize > maxBatchSizeBytes) { - ImmutableList mutations = batch.build(); - c.output(mutations); - batch = ImmutableList.builder(); - batchSizeBytes = 0; - batchCells = 0; +if (((batchCells + groupCells) > maxNumMutations) +|| ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) { + outputBatch(c); Review comment: Beam docs make no comment about whether DoFns should be thread-safe or not, but adding synchronized blocks does not harm. GatherBundleAndSortFn has been updated to be thread-safe. BatchFn has been re-written to use local variables only so is now thread-safe. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150412) Time Spent: 2h 50m (was: 2h 40m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150410 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 02/Oct/18 15:03 Start Date: 02/Oct/18 15:03 Worklog Time Spent: 10m Work Description: nielm commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221988893 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -880,192 +868,191 @@ public SpannerWriteResult expand(PCollection input) { .apply("Schema View", View.asSingleton()); // Split the mutations into batchable and unbatchable mutations. - // Filter out mutation groups too big to be batched + // Filter out mutation groups too big to be batched. PCollectionTuple filteredMutations = - input.apply( - "Filter Unbatchable Mutations", - ParDo.of( - new BatchableMutationFilterFn( - schemaView, - UNBATCHABLE_MUTATIONS_TAG, - spec.getBatchSizeBytes(), - spec.getMaxNumMutations())) - .withSideInputs(schemaView) - .withOutputTags( - BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG))); - - // Serialize batchable mutations, we don't need to encode/decode them while reshuffling. - // The primary key is encoded via OrderedCode so we can calculate quantiles. - PCollection serialized = - filteredMutations - .get(BATCHABLE_MUTATIONS_TAG) + input + .apply("To Global Window", Window.into(new GlobalWindows())) .apply( - "Serialize mutations", - ParDo.of(new SerializeMutationsFn(schemaView)).withSideInputs(schemaView)) - .setCoder(SerializedMutationCoder.of()); - - // Sample primary keys using ApproximateQuantiles. - PCollectionView>> keySample = - serialized - .apply("Extract keys", ParDo.of(new ExtractKeys())) - .apply("Sample keys", sampler) - .apply("Keys sample as view", View.asMap()); - - TupleTag mainTag = new TupleTag<>("mainOut"); - TupleTag failedTag = new TupleTag<>("failedMutations"); - // Assign partition based on the closest element in the sample and group mutations. - AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample); + "Filter Unbatchable Mutations", + ParDo.of( + new BatchableMutationFilterFn( + schemaView, + UNBATCHABLE_MUTATIONS_TAG, + spec.getBatchSizeBytes(), + spec.getMaxNumMutations())) + .withSideInputs(schemaView) + .withOutputTags( + BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG))); + + // Build a set of Mutation groups from the current bundle, + // sort them by table/key then split into batches. PCollection> batchedMutations = - serialized - .apply("Partition input", ParDo.of(assignPartitionFn).withSideInputs(keySample)) - .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializedMutationCoder.of())) - .apply("Group by partition", GroupByKey.create()) + filteredMutations + .get(BATCHABLE_MUTATIONS_TAG) .apply( - "Batch mutations together", + "Gather And Sort", ParDo.of( - new BatchFn( + new GatherBundleAndSortFn( spec.getBatchSizeBytes(), spec.getMaxNumMutations(), - spec.getSpannerConfig(), + spec.getGroupingFactor(), schemaView)) + .withSideInputs(schemaView)) + .apply( + "Create Batches", + ParDo.of( + new BatchFn( + spec.getBatchSizeBytes(), spec.getMaxNumMutations(), schemaView)) .withSideInputs(schemaView)); // Merge the batchable and unbatchable mutations and write to Spanner. PCollectionTuple result = PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG)) .and(batchedMutations) -
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150241 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 23:01 Start Date: 01/Oct/18 23:01 Worklog Time Spent: 10m Work Description: nithinsujir commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221777036 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -880,192 +868,191 @@ public SpannerWriteResult expand(PCollection input) { .apply("Schema View", View.asSingleton()); // Split the mutations into batchable and unbatchable mutations. - // Filter out mutation groups too big to be batched + // Filter out mutation groups too big to be batched. PCollectionTuple filteredMutations = - input.apply( - "Filter Unbatchable Mutations", - ParDo.of( - new BatchableMutationFilterFn( - schemaView, - UNBATCHABLE_MUTATIONS_TAG, - spec.getBatchSizeBytes(), - spec.getMaxNumMutations())) - .withSideInputs(schemaView) - .withOutputTags( - BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG))); - - // Serialize batchable mutations, we don't need to encode/decode them while reshuffling. - // The primary key is encoded via OrderedCode so we can calculate quantiles. - PCollection serialized = - filteredMutations - .get(BATCHABLE_MUTATIONS_TAG) + input + .apply("To Global Window", Window.into(new GlobalWindows())) .apply( - "Serialize mutations", - ParDo.of(new SerializeMutationsFn(schemaView)).withSideInputs(schemaView)) - .setCoder(SerializedMutationCoder.of()); - - // Sample primary keys using ApproximateQuantiles. - PCollectionView>> keySample = - serialized - .apply("Extract keys", ParDo.of(new ExtractKeys())) - .apply("Sample keys", sampler) - .apply("Keys sample as view", View.asMap()); - - TupleTag mainTag = new TupleTag<>("mainOut"); - TupleTag failedTag = new TupleTag<>("failedMutations"); - // Assign partition based on the closest element in the sample and group mutations. - AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample); + "Filter Unbatchable Mutations", + ParDo.of( + new BatchableMutationFilterFn( + schemaView, + UNBATCHABLE_MUTATIONS_TAG, + spec.getBatchSizeBytes(), + spec.getMaxNumMutations())) + .withSideInputs(schemaView) + .withOutputTags( + BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG))); + + // Build a set of Mutation groups from the current bundle, + // sort them by table/key then split into batches. PCollection> batchedMutations = - serialized - .apply("Partition input", ParDo.of(assignPartitionFn).withSideInputs(keySample)) - .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializedMutationCoder.of())) - .apply("Group by partition", GroupByKey.create()) + filteredMutations + .get(BATCHABLE_MUTATIONS_TAG) .apply( - "Batch mutations together", + "Gather And Sort", ParDo.of( - new BatchFn( + new GatherBundleAndSortFn( spec.getBatchSizeBytes(), spec.getMaxNumMutations(), - spec.getSpannerConfig(), + spec.getGroupingFactor(), schemaView)) + .withSideInputs(schemaView)) + .apply( + "Create Batches", + ParDo.of( + new BatchFn( + spec.getBatchSizeBytes(), spec.getMaxNumMutations(), schemaView)) .withSideInputs(schemaView)); // Merge the batchable and unbatchable mutations and write to Spanner. PCollectionTuple result = PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG)) .and(batchedMutations) -
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150240 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 23:01 Start Date: 01/Oct/18 23:01 Worklog Time Spent: 10m Work Description: nithinsujir commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221782468 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -1074,59 +1061,61 @@ public void setup() { batch = ImmutableList.builder(); batchSizeBytes = 0; batchCells = 0; - spannerAccessor = spannerConfig.connectToSpanner(); -} - -@Teardown -public void teardown() { - spannerAccessor.close(); } @ProcessElement public void processElement(ProcessContext c) throws Exception { SpannerSchema spannerSchema = c.sideInput(schemaView); - MutationGroupEncoder mutationGroupEncoder = new MutationGroupEncoder(spannerSchema); - KV> element = c.element(); - for (SerializedMutation kv : element.getValue()) { -byte[] value = kv.getMutationGroupBytes(); -MutationGroup mg = mutationGroupEncoder.decode(value); + // Iterate through list, outputting whenever a batch is complete. + for (KV kv : c.element()) { +MutationGroup mg = decode(kv.getValue()); + long groupSize = MutationSizeEstimator.sizeOf(mg); long groupCells = MutationCellCounter.countOf(spannerSchema, mg); -if (batchCells + groupCells > maxNumMutations -|| batchSizeBytes + groupSize > maxBatchSizeBytes) { - ImmutableList mutations = batch.build(); - c.output(mutations); - batch = ImmutableList.builder(); - batchSizeBytes = 0; - batchCells = 0; +if (((batchCells + groupCells) > maxNumMutations) +|| ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) { + outputBatch(c); Review comment: Any reason why "c" needs to be passed around when everything else is coming via a member variable? And am I getting it right that this is not thread safe and I assume that's well understood and accepted? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150240) Time Spent: 2.5h (was: 2h 20m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150061 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 15:21 Start Date: 01/Oct/18 15:21 Worklog Time Spent: 10m Work Description: nielm commented on issue #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#issuecomment-425947867 The first 2 commits are #6422 and #6409, which this commit is dependant on. It is not really possible to split up the last commit, as I am rewriting the main part of the SpannerIO Writer -- where the MutationGroups are sorted and batched together. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150061) Time Spent: 2h 20m (was: 2h 10m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150058 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 15:15 Start Date: 01/Oct/18 15:15 Worklog Time Spent: 10m Work Description: nielm commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221648135 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SerializedMutation.java ## @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; - -@AutoValue -abstract class SerializedMutation { Review comment: Added SerializedMutation is not longer required as serialized Mutation Groups are passed between transforms as byte[] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150058) Time Spent: 2h (was: 1h 50m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150059 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 15:15 Start Date: 01/Oct/18 15:15 Worklog Time Spent: 10m Work Description: nielm commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221648192 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationKeyEncoder.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; + +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Mutation.Op; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema.KeyPart; +import org.joda.time.DateTime; +import org.joda.time.Days; +import org.joda.time.MutableDateTime; + +/** + * Given the Schema, Encodes the table name and Key into a lexicographically sortable {@code + * byte[]}. + */ +class MutationKeyEncoder { Review comment: Added: MutationGroupEncoder class is renamed to MutationKeyEncoder, as it no longer encodes entire MutationGroups, only the Key. (SerializableCoder is now used to encode entire MutationGroups). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150059) Time Spent: 2h 10m (was: 2h) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150057 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 01/Oct/18 15:15 Start Date: 01/Oct/18 15:15 Worklog Time Spent: 10m Work Description: nielm commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221648082 ## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java ## @@ -371,97 +276,46 @@ public void noBatching() throws Exception { @Test @Category(NeedsRunner.class) - public void batchingPlusSampling() throws Exception { -PCollection mutations = -pipeline.apply( -Create.of( -g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), g(m(7L)), g(m(8L)), -g(m(9L)), g(m(10L; - -mutations.apply( -SpannerIO.write() -.withProjectId("test-project") -.withInstanceId("test-instance") -.withDatabaseId("test-database") -.withServiceFactory(serviceFactory) -.withBatchSizeBytes(10) -.withSampler(fakeSampler(m(2L), m(5L), m(10L))) -.grouped()); -pipeline.run(); + public void reportFailures() throws Exception { -verifyBatches( -batch(m(1L), m(2L)), batch(m(3L), m(4L), m(5L)), batch(m(6L), m(7L), m(8L), m(9L), m(10L))); - } +MutationGroup[] mutationGroups = new MutationGroup[10]; +for (int i = 0; i < mutationGroups.length; i++) { + mutationGroups[i] = g(m((long) i)); +} - @Test - @Category(NeedsRunner.class) - public void reportFailures() throws Exception { -PCollection mutations = -pipeline.apply( -Create.of( -g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), g(m(7L)), g(m(8L)), -g(m(9L)), g(m(10L; +List mutationGroupList = Arrays.asList(mutationGroups); when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any())) .thenAnswer( invocationOnMock -> { + Preconditions.checkNotNull(invocationOnMock.getArguments()[0]); throw SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "oops"); }); SpannerWriteResult result = -mutations.apply( -SpannerIO.write() -.withProjectId("test-project") -.withInstanceId("test-instance") -.withDatabaseId("test-database") -.withServiceFactory(serviceFactory) -.withBatchSizeBytes(10) -.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES) -.withSampler(fakeSampler(m(2L), m(5L), m(10L))) -.grouped()); +pipeline +.apply(Create.of(mutationGroupList)) +.apply( +SpannerIO.write() +.withProjectId("test-project") +.withInstanceId("test-instance") +.withDatabaseId("test-database") +.withServiceFactory(serviceFactory) +.withBatchSizeBytes(0) Review comment: Yes. It is no longer possible to test batching using the TestPipeline (I commented in the header of this class to that effect). So disabling batching makes it clear that batching no longer occurs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150057) Time Spent: 1h 50m (was: 1h 40m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149222&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149222 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 17:16 Start Date: 28/Sep/18 17:16 Worklog Time Spent: 10m Work Description: nithinsujir commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221313416 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SerializedMutation.java ## @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; - -@AutoValue -abstract class SerializedMutation { Review comment: Should probably add explanation of why the SerializedMutation classes are not needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149222) Time Spent: 1.5h (was: 1h 20m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149223 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 17:16 Start Date: 28/Sep/18 17:16 Worklog Time Spent: 10m Work Description: nithinsujir commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221315969 ## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java ## @@ -371,97 +276,46 @@ public void noBatching() throws Exception { @Test @Category(NeedsRunner.class) - public void batchingPlusSampling() throws Exception { -PCollection mutations = -pipeline.apply( -Create.of( -g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), g(m(7L)), g(m(8L)), -g(m(9L)), g(m(10L; - -mutations.apply( -SpannerIO.write() -.withProjectId("test-project") -.withInstanceId("test-instance") -.withDatabaseId("test-database") -.withServiceFactory(serviceFactory) -.withBatchSizeBytes(10) -.withSampler(fakeSampler(m(2L), m(5L), m(10L))) -.grouped()); -pipeline.run(); + public void reportFailures() throws Exception { -verifyBatches( -batch(m(1L), m(2L)), batch(m(3L), m(4L), m(5L)), batch(m(6L), m(7L), m(8L), m(9L), m(10L))); - } +MutationGroup[] mutationGroups = new MutationGroup[10]; +for (int i = 0; i < mutationGroups.length; i++) { + mutationGroups[i] = g(m((long) i)); +} - @Test - @Category(NeedsRunner.class) - public void reportFailures() throws Exception { -PCollection mutations = -pipeline.apply( -Create.of( -g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), g(m(7L)), g(m(8L)), -g(m(9L)), g(m(10L; +List mutationGroupList = Arrays.asList(mutationGroups); when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any())) .thenAnswer( invocationOnMock -> { + Preconditions.checkNotNull(invocationOnMock.getArguments()[0]); throw SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "oops"); }); SpannerWriteResult result = -mutations.apply( -SpannerIO.write() -.withProjectId("test-project") -.withInstanceId("test-instance") -.withDatabaseId("test-database") -.withServiceFactory(serviceFactory) -.withBatchSizeBytes(10) -.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES) -.withSampler(fakeSampler(m(2L), m(5L), m(10L))) -.grouped()); +pipeline +.apply(Create.of(mutationGroupList)) +.apply( +SpannerIO.write() +.withProjectId("test-project") +.withInstanceId("test-instance") +.withDatabaseId("test-database") +.withServiceFactory(serviceFactory) +.withBatchSizeBytes(0) Review comment: BatchSizeBytes went to 0. Is that intentional? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149223) Time Spent: 1.5h (was: 1h 20m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149224&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149224 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 17:16 Start Date: 28/Sep/18 17:16 Worklog Time Spent: 10m Work Description: nithinsujir commented on a change in pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#discussion_r221312319 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationKeyEncoder.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; + +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Mutation.Op; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema.KeyPart; +import org.joda.time.DateTime; +import org.joda.time.Days; +import org.joda.time.MutableDateTime; + +/** + * Given the Schema, Encodes the table name and Key into a lexicographically sortable {@code + * byte[]}. + */ +class MutationKeyEncoder { Review comment: In the commit description, could you provide an explanation about why we are changing from MutationGroupEncoder to MutationKeyEncoder? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149224) Time Spent: 1h 40m (was: 1.5h) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149144&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149144 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 12:26 Start Date: 28/Sep/18 12:26 Worklog Time Spent: 10m Work Description: nithinsujir commented on issue #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#issuecomment-425419373 R: @biswanag This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149144) Time Spent: 1h 20m (was: 1h 10m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149055&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149055 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 07:37 Start Date: 28/Sep/18 07:37 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#issuecomment-425349811 Thanks Niel. Added @nithinsujir for a first round of reviews for this and https://github.com/apache/beam/pull/6422. I can take a look next week. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149055) Time Spent: 1h 10m (was: 1h) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149053 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 28/Sep/18 07:31 Start Date: 28/Sep/18 07:31 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478#issuecomment-425348306 R: @nithinsujir This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 149053) Time Spent: 1h (was: 50m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=147211&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147211 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 24/Sep/18 17:30 Start Date: 24/Sep/18 17:30 Worklog Time Spent: 10m Work Description: nielm opened a new pull request #6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support URL: https://github.com/apache/beam/pull/6478 Rewrite the mechanism whereby SpannerIO batches the incoming mutations. Previously the entire input was scanned, assigned partitions and GroupByKey before writing. This required that the input was bounded, large amounts of disk space, and the entire input had to be read before any writes started. With this change, the input is only sorted/batched over the current Bundle (or 1000x max batch size, whichever is smaller). This has the following benefits: - Significantly decreases latency before the first row is written (down to a few seconds after the pipeline is started) - Allows the SpannerIO writer to be used in a streaming pipeline - Reduces the amount of worker disk space required for large sources - Significantly improves the write speed for large sources @chamikaramj Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- |
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146662 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 22/Sep/18 02:42 Start Date: 22/Sep/18 02:42 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6450: [BEAM-5445] Clarifies that SpannerIO currently does not support unbounded writes. URL: https://github.com/apache/beam/pull/6450#issuecomment-423711142 Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146662) Time Spent: 0.5h (was: 20m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146663 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 22/Sep/18 02:42 Start Date: 22/Sep/18 02:42 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #6450: [BEAM-5445] Clarifies that SpannerIO currently does not support unbounded writes. URL: https://github.com/apache/beam/pull/6450 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index f3d25733fbd..af2a3b00d1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -176,6 +176,11 @@ * guaranteed that mutations in a group are submitted in the same transaction. Build {@link * SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a * transformation that can be applied to a PCollection of MutationGroup. + * + * Streaming Support + * + * {@link SpannerIO.Write} currently does not support unbounded writes hence should not be used + * as a sink for streaming pipelines. */ @Experimental(Experimental.Kind.SOURCE_SINK) public class SpannerIO { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146663) Time Spent: 40m (was: 0.5h) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146658 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 22/Sep/18 02:16 Start Date: 22/Sep/18 02:16 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6450: [BEAM-5445] Clarifies that SpannerIO currently does not support unbounded writes. URL: https://github.com/apache/beam/pull/6450#issuecomment-423709925 R: @udim This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146658) Time Spent: 20m (was: 10m) > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes
[ https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146113&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146113 ] ASF GitHub Bot logged work on BEAM-5445: Author: ASF GitHub Bot Created on: 20/Sep/18 19:56 Start Date: 20/Sep/18 19:56 Worklog Time Spent: 10m Work Description: chamikaramj opened a new pull request #6450: [BEAM-5445] Clarifies that SpannerIO currently does not support unbounded writes. URL: https://github.com/apache/beam/pull/6450 Clarifies that SpannerIO currently does not support unbounded writes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146113) Time Spent: 10m Remaining Estimate: 0h > Update SpannerIO to support unbounded writes > > > Key: BEAM-5445 > URL: https://issues.apache.org/jira/browse/BEAM-5445 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Chamikara Jayalath >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Currently, due to a known issue, streaming pipelines that use SpannerIO.Write > do not actually write to Spanner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)