[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=151205&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151205
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 04/Oct/18 14:48
Start Date: 04/Oct/18 14:48
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on issue #6478: [BEAM-5445] 
[BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478#issuecomment-427047885
 
 
   LGTM. I suggest we wait for @biswanag comments as well.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 151205)
Time Spent: 3h  (was: 2h 50m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150412
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 02/Oct/18 15:15
Start Date: 02/Oct/18 15:15
Worklog Time Spent: 10m 
  Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221993914
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##
 @@ -1074,59 +1061,61 @@ public void setup() {
   batch = ImmutableList.builder();
   batchSizeBytes = 0;
   batchCells = 0;
-  spannerAccessor = spannerConfig.connectToSpanner();
-}
-
-@Teardown
-public void teardown() {
-  spannerAccessor.close();
 }
 
 @ProcessElement
 public void processElement(ProcessContext c) throws Exception {
   SpannerSchema spannerSchema = c.sideInput(schemaView);
-  MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(spannerSchema);
 
-  KV> element = c.element();
-  for (SerializedMutation kv : element.getValue()) {
-byte[] value = kv.getMutationGroupBytes();
-MutationGroup mg = mutationGroupEncoder.decode(value);
+  // Iterate through list, outputting whenever a batch is complete.
+  for (KV kv : c.element()) {
+MutationGroup mg = decode(kv.getValue());
+
 long groupSize = MutationSizeEstimator.sizeOf(mg);
 long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
-if (batchCells + groupCells > maxNumMutations
-|| batchSizeBytes + groupSize > maxBatchSizeBytes) {
-  ImmutableList mutations = batch.build();
-  c.output(mutations);
-  batch = ImmutableList.builder();
-  batchSizeBytes = 0;
-  batchCells = 0;
+if (((batchCells + groupCells) > maxNumMutations)
+|| ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) {
+  outputBatch(c);
 
 Review comment:
   Beam docs make no comment about whether DoFns should be thread-safe or not, 
but adding synchronized blocks does not harm.
   GatherBundleAndSortFn has been updated to be thread-safe.
   
   BatchFn has been re-written to use local variables only so is now 
thread-safe.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150412)
Time Spent: 2h 50m  (was: 2h 40m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150410
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 02/Oct/18 15:03
Start Date: 02/Oct/18 15:03
Worklog Time Spent: 10m 
  Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221988893
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##
 @@ -880,192 +868,191 @@ public SpannerWriteResult 
expand(PCollection input) {
   .apply("Schema View", View.asSingleton());
 
   // Split the mutations into batchable and unbatchable mutations.
-  // Filter out mutation groups too big to be batched
+  // Filter out mutation groups too big to be batched.
   PCollectionTuple filteredMutations =
-  input.apply(
-  "Filter Unbatchable Mutations",
-  ParDo.of(
-  new BatchableMutationFilterFn(
-  schemaView,
-  UNBATCHABLE_MUTATIONS_TAG,
-  spec.getBatchSizeBytes(),
-  spec.getMaxNumMutations()))
-  .withSideInputs(schemaView)
-  .withOutputTags(
-  BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
-
-  // Serialize batchable mutations, we don't need to encode/decode them 
while reshuffling.
-  // The primary key is encoded via OrderedCode so we can calculate 
quantiles.
-  PCollection serialized =
-  filteredMutations
-  .get(BATCHABLE_MUTATIONS_TAG)
+  input
+  .apply("To Global Window", Window.into(new GlobalWindows()))
   .apply(
-  "Serialize mutations",
-  ParDo.of(new 
SerializeMutationsFn(schemaView)).withSideInputs(schemaView))
-  .setCoder(SerializedMutationCoder.of());
-
-  // Sample primary keys using ApproximateQuantiles.
-  PCollectionView>> keySample =
-  serialized
-  .apply("Extract keys", ParDo.of(new ExtractKeys()))
-  .apply("Sample keys", sampler)
-  .apply("Keys sample as view", View.asMap());
-
-  TupleTag mainTag = new TupleTag<>("mainOut");
-  TupleTag failedTag = new TupleTag<>("failedMutations");
-  // Assign partition based on the closest element in the sample and group 
mutations.
-  AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample);
+  "Filter Unbatchable Mutations",
+  ParDo.of(
+  new BatchableMutationFilterFn(
+  schemaView,
+  UNBATCHABLE_MUTATIONS_TAG,
+  spec.getBatchSizeBytes(),
+  spec.getMaxNumMutations()))
+  .withSideInputs(schemaView)
+  .withOutputTags(
+  BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
+
+  // Build a set of Mutation groups from the current bundle,
+  // sort them by table/key then split into batches.
   PCollection> batchedMutations =
-  serialized
-  .apply("Partition input", 
ParDo.of(assignPartitionFn).withSideInputs(keySample))
-  .setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializedMutationCoder.of()))
-  .apply("Group by partition", GroupByKey.create())
+  filteredMutations
+  .get(BATCHABLE_MUTATIONS_TAG)
   .apply(
-  "Batch mutations together",
+  "Gather And Sort",
   ParDo.of(
-  new BatchFn(
+  new GatherBundleAndSortFn(
   spec.getBatchSizeBytes(),
   spec.getMaxNumMutations(),
-  spec.getSpannerConfig(),
+  spec.getGroupingFactor(),
   schemaView))
+  .withSideInputs(schemaView))
+  .apply(
+  "Create Batches",
+  ParDo.of(
+  new BatchFn(
+  spec.getBatchSizeBytes(), 
spec.getMaxNumMutations(), schemaView))
   .withSideInputs(schemaView));
 
   // Merge the batchable and unbatchable mutations and write to Spanner.
   PCollectionTuple result =
   PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG))
   .and(batchedMutations)
-  

[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150241
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 23:01
Start Date: 01/Oct/18 23:01
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221777036
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##
 @@ -880,192 +868,191 @@ public SpannerWriteResult 
expand(PCollection input) {
   .apply("Schema View", View.asSingleton());
 
   // Split the mutations into batchable and unbatchable mutations.
-  // Filter out mutation groups too big to be batched
+  // Filter out mutation groups too big to be batched.
   PCollectionTuple filteredMutations =
-  input.apply(
-  "Filter Unbatchable Mutations",
-  ParDo.of(
-  new BatchableMutationFilterFn(
-  schemaView,
-  UNBATCHABLE_MUTATIONS_TAG,
-  spec.getBatchSizeBytes(),
-  spec.getMaxNumMutations()))
-  .withSideInputs(schemaView)
-  .withOutputTags(
-  BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
-
-  // Serialize batchable mutations, we don't need to encode/decode them 
while reshuffling.
-  // The primary key is encoded via OrderedCode so we can calculate 
quantiles.
-  PCollection serialized =
-  filteredMutations
-  .get(BATCHABLE_MUTATIONS_TAG)
+  input
+  .apply("To Global Window", Window.into(new GlobalWindows()))
   .apply(
-  "Serialize mutations",
-  ParDo.of(new 
SerializeMutationsFn(schemaView)).withSideInputs(schemaView))
-  .setCoder(SerializedMutationCoder.of());
-
-  // Sample primary keys using ApproximateQuantiles.
-  PCollectionView>> keySample =
-  serialized
-  .apply("Extract keys", ParDo.of(new ExtractKeys()))
-  .apply("Sample keys", sampler)
-  .apply("Keys sample as view", View.asMap());
-
-  TupleTag mainTag = new TupleTag<>("mainOut");
-  TupleTag failedTag = new TupleTag<>("failedMutations");
-  // Assign partition based on the closest element in the sample and group 
mutations.
-  AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample);
+  "Filter Unbatchable Mutations",
+  ParDo.of(
+  new BatchableMutationFilterFn(
+  schemaView,
+  UNBATCHABLE_MUTATIONS_TAG,
+  spec.getBatchSizeBytes(),
+  spec.getMaxNumMutations()))
+  .withSideInputs(schemaView)
+  .withOutputTags(
+  BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
+
+  // Build a set of Mutation groups from the current bundle,
+  // sort them by table/key then split into batches.
   PCollection> batchedMutations =
-  serialized
-  .apply("Partition input", 
ParDo.of(assignPartitionFn).withSideInputs(keySample))
-  .setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializedMutationCoder.of()))
-  .apply("Group by partition", GroupByKey.create())
+  filteredMutations
+  .get(BATCHABLE_MUTATIONS_TAG)
   .apply(
-  "Batch mutations together",
+  "Gather And Sort",
   ParDo.of(
-  new BatchFn(
+  new GatherBundleAndSortFn(
   spec.getBatchSizeBytes(),
   spec.getMaxNumMutations(),
-  spec.getSpannerConfig(),
+  spec.getGroupingFactor(),
   schemaView))
+  .withSideInputs(schemaView))
+  .apply(
+  "Create Batches",
+  ParDo.of(
+  new BatchFn(
+  spec.getBatchSizeBytes(), 
spec.getMaxNumMutations(), schemaView))
   .withSideInputs(schemaView));
 
   // Merge the batchable and unbatchable mutations and write to Spanner.
   PCollectionTuple result =
   PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG))
   .and(batchedMutations)
-

[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150240
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 23:01
Start Date: 01/Oct/18 23:01
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221782468
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##
 @@ -1074,59 +1061,61 @@ public void setup() {
   batch = ImmutableList.builder();
   batchSizeBytes = 0;
   batchCells = 0;
-  spannerAccessor = spannerConfig.connectToSpanner();
-}
-
-@Teardown
-public void teardown() {
-  spannerAccessor.close();
 }
 
 @ProcessElement
 public void processElement(ProcessContext c) throws Exception {
   SpannerSchema spannerSchema = c.sideInput(schemaView);
-  MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(spannerSchema);
 
-  KV> element = c.element();
-  for (SerializedMutation kv : element.getValue()) {
-byte[] value = kv.getMutationGroupBytes();
-MutationGroup mg = mutationGroupEncoder.decode(value);
+  // Iterate through list, outputting whenever a batch is complete.
+  for (KV kv : c.element()) {
+MutationGroup mg = decode(kv.getValue());
+
 long groupSize = MutationSizeEstimator.sizeOf(mg);
 long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
-if (batchCells + groupCells > maxNumMutations
-|| batchSizeBytes + groupSize > maxBatchSizeBytes) {
-  ImmutableList mutations = batch.build();
-  c.output(mutations);
-  batch = ImmutableList.builder();
-  batchSizeBytes = 0;
-  batchCells = 0;
+if (((batchCells + groupCells) > maxNumMutations)
+|| ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) {
+  outputBatch(c);
 
 Review comment:
   Any reason why "c" needs to be passed around when everything else is coming 
via a member variable?
   
   And am I getting it right that this is not thread safe and I assume that's 
well understood and accepted?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150240)
Time Spent: 2.5h  (was: 2h 20m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150061
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 15:21
Start Date: 01/Oct/18 15:21
Worklog Time Spent: 10m 
  Work Description: nielm commented on issue #6478: [BEAM-5445] [BEAM-4796] 
SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478#issuecomment-425947867
 
 
   The first 2 commits are #6422 and #6409, which this commit is dependant on.
   
   It is not really possible to split up the last commit, as I am rewriting the 
main part of the SpannerIO Writer -- where the MutationGroups are sorted and 
batched together. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150061)
Time Spent: 2h 20m  (was: 2h 10m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150058
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 15:15
Start Date: 01/Oct/18 15:15
Worklog Time Spent: 10m 
  Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221648135
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SerializedMutation.java
 ##
 @@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class SerializedMutation {
 
 Review comment:
   Added
   
   SerializedMutation is not longer required as serialized Mutation Groups 
   are passed between transforms as byte[]
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150058)
Time Spent: 2h  (was: 1h 50m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150059
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 15:15
Start Date: 01/Oct/18 15:15
Worklog Time Spent: 10m 
  Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221648192
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationKeyEncoder.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Mutation.Op;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema.KeyPart;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.MutableDateTime;
+
+/**
+ * Given the Schema, Encodes the table name and Key into a lexicographically 
sortable {@code
+ * byte[]}.
+ */
+class MutationKeyEncoder {
 
 Review comment:
   Added: 
   
   MutationGroupEncoder class is renamed to MutationKeyEncoder, as it no
   longer encodes entire MutationGroups, only the Key. (SerializableCoder 
   is now used to encode entire MutationGroups).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150059)
Time Spent: 2h 10m  (was: 2h)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150057
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 01/Oct/18 15:15
Start Date: 01/Oct/18 15:15
Worklog Time Spent: 10m 
  Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221648082
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
 ##
 @@ -371,97 +276,46 @@ public void noBatching() throws Exception {
 
   @Test
   @Category(NeedsRunner.class)
-  public void batchingPlusSampling() throws Exception {
-PCollection mutations =
-pipeline.apply(
-Create.of(
-g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-g(m(9L)), g(m(10L;
-
-mutations.apply(
-SpannerIO.write()
-.withProjectId("test-project")
-.withInstanceId("test-instance")
-.withDatabaseId("test-database")
-.withServiceFactory(serviceFactory)
-.withBatchSizeBytes(10)
-.withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-.grouped());
-pipeline.run();
+  public void reportFailures() throws Exception {
 
-verifyBatches(
-batch(m(1L), m(2L)), batch(m(3L), m(4L), m(5L)), batch(m(6L), m(7L), 
m(8L), m(9L), m(10L)));
-  }
+MutationGroup[] mutationGroups = new MutationGroup[10];
+for (int i = 0; i < mutationGroups.length; i++) {
+  mutationGroups[i] = g(m((long) i));
+}
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void reportFailures() throws Exception {
-PCollection mutations =
-pipeline.apply(
-Create.of(
-g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-g(m(9L)), g(m(10L;
+List mutationGroupList = Arrays.asList(mutationGroups);
 
 when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
 .thenAnswer(
 invocationOnMock -> {
+  Preconditions.checkNotNull(invocationOnMock.getArguments()[0]);
   throw 
SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "oops");
 });
 
 SpannerWriteResult result =
-mutations.apply(
-SpannerIO.write()
-.withProjectId("test-project")
-.withInstanceId("test-instance")
-.withDatabaseId("test-database")
-.withServiceFactory(serviceFactory)
-.withBatchSizeBytes(10)
-.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)
-.withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-.grouped());
+pipeline
+.apply(Create.of(mutationGroupList))
+.apply(
+SpannerIO.write()
+.withProjectId("test-project")
+.withInstanceId("test-instance")
+.withDatabaseId("test-database")
+.withServiceFactory(serviceFactory)
+.withBatchSizeBytes(0)
 
 Review comment:
   Yes. It is no longer possible to test batching using the TestPipeline (I 
commented in the header of this class to that effect). 
   So disabling batching makes it clear that batching no longer occurs. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150057)
Time Spent: 1h 50m  (was: 1h 40m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149222&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149222
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 17:16
Start Date: 28/Sep/18 17:16
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221313416
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SerializedMutation.java
 ##
 @@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class SerializedMutation {
 
 Review comment:
   Should probably add explanation of why the SerializedMutation classes are 
not needed anymore.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149222)
Time Spent: 1.5h  (was: 1h 20m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149223
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 17:16
Start Date: 28/Sep/18 17:16
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221315969
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
 ##
 @@ -371,97 +276,46 @@ public void noBatching() throws Exception {
 
   @Test
   @Category(NeedsRunner.class)
-  public void batchingPlusSampling() throws Exception {
-PCollection mutations =
-pipeline.apply(
-Create.of(
-g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-g(m(9L)), g(m(10L;
-
-mutations.apply(
-SpannerIO.write()
-.withProjectId("test-project")
-.withInstanceId("test-instance")
-.withDatabaseId("test-database")
-.withServiceFactory(serviceFactory)
-.withBatchSizeBytes(10)
-.withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-.grouped());
-pipeline.run();
+  public void reportFailures() throws Exception {
 
-verifyBatches(
-batch(m(1L), m(2L)), batch(m(3L), m(4L), m(5L)), batch(m(6L), m(7L), 
m(8L), m(9L), m(10L)));
-  }
+MutationGroup[] mutationGroups = new MutationGroup[10];
+for (int i = 0; i < mutationGroups.length; i++) {
+  mutationGroups[i] = g(m((long) i));
+}
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void reportFailures() throws Exception {
-PCollection mutations =
-pipeline.apply(
-Create.of(
-g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-g(m(9L)), g(m(10L;
+List mutationGroupList = Arrays.asList(mutationGroups);
 
 when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
 .thenAnswer(
 invocationOnMock -> {
+  Preconditions.checkNotNull(invocationOnMock.getArguments()[0]);
   throw 
SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "oops");
 });
 
 SpannerWriteResult result =
-mutations.apply(
-SpannerIO.write()
-.withProjectId("test-project")
-.withInstanceId("test-instance")
-.withDatabaseId("test-database")
-.withServiceFactory(serviceFactory)
-.withBatchSizeBytes(10)
-.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)
-.withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-.grouped());
+pipeline
+.apply(Create.of(mutationGroupList))
+.apply(
+SpannerIO.write()
+.withProjectId("test-project")
+.withInstanceId("test-instance")
+.withDatabaseId("test-database")
+.withServiceFactory(serviceFactory)
+.withBatchSizeBytes(0)
 
 Review comment:
   BatchSizeBytes went to 0. Is that intentional?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149223)
Time Spent: 1.5h  (was: 1h 20m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149224&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149224
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 17:16
Start Date: 28/Sep/18 17:16
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221312319
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationKeyEncoder.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Mutation.Op;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema.KeyPart;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.MutableDateTime;
+
+/**
+ * Given the Schema, Encodes the table name and Key into a lexicographically 
sortable {@code
+ * byte[]}.
+ */
+class MutationKeyEncoder {
 
 Review comment:
   In the commit description, could you provide an explanation about why we are 
changing from MutationGroupEncoder to MutationKeyEncoder?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149224)
Time Spent: 1h 40m  (was: 1.5h)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149144&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149144
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 12:26
Start Date: 28/Sep/18 12:26
Worklog Time Spent: 10m 
  Work Description: nithinsujir commented on issue #6478: [BEAM-5445] 
[BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478#issuecomment-425419373
 
 
   R: @biswanag
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149144)
Time Spent: 1h 20m  (was: 1h 10m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149055&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149055
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 07:37
Start Date: 28/Sep/18 07:37
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6478: [BEAM-5445] 
[BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478#issuecomment-425349811
 
 
   Thanks Niel. Added @nithinsujir for a first round of reviews for this and 
https://github.com/apache/beam/pull/6422. I can take a look next week.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149055)
Time Spent: 1h 10m  (was: 1h)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149053
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 28/Sep/18 07:31
Start Date: 28/Sep/18 07:31
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6478: [BEAM-5445] 
[BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478#issuecomment-425348306
 
 
   R: @nithinsujir


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 149053)
Time Spent: 1h  (was: 50m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=147211&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147211
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 24/Sep/18 17:30
Start Date: 24/Sep/18 17:30
Worklog Time Spent: 10m 
  Work Description: nielm opened a new pull request #6478: [BEAM-5445] 
[BEAM-4796] SpannerIO: Only batch on the current bundle. Adds streaming support
URL: https://github.com/apache/beam/pull/6478
 
 
   Rewrite the mechanism whereby SpannerIO batches the incoming mutations.
   
   Previously the entire input was scanned, assigned partitions and GroupByKey 
before writing. This required that the input was bounded, large amounts of disk 
space, and the entire input had to be read before any writes started.
   
   With this change, the input is only sorted/batched over the current Bundle 
(or 1000x max batch size, whichever is smaller).
   
   This has the following benefits: 
   
   - Significantly decreases latency before the first row is written (down to a 
few seconds after the pipeline is started)
   - Allows the SpannerIO writer to be used in a streaming pipeline
   - Reduces the amount of worker disk space required for large sources
   - Significantly improves the write speed for large sources
   
   
   @chamikaramj 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- |

[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146662
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 22/Sep/18 02:42
Start Date: 22/Sep/18 02:42
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6450: [BEAM-5445] 
Clarifies that SpannerIO currently does not support unbounded writes.
URL: https://github.com/apache/beam/pull/6450#issuecomment-423711142
 
 
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146662)
Time Spent: 0.5h  (was: 20m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146663
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 22/Sep/18 02:42
Start Date: 22/Sep/18 02:42
Worklog Time Spent: 10m 
  Work Description: chamikaramj closed pull request #6450: [BEAM-5445] 
Clarifies that SpannerIO currently does not support unbounded writes.
URL: https://github.com/apache/beam/pull/6450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index f3d25733fbd..af2a3b00d1f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -176,6 +176,11 @@
  * guaranteed that mutations in a group are submitted in the same transaction. 
Build {@link
  * SpannerIO.Write} transform, and call {@link Write#grouped()} method. It 
will return a
  * transformation that can be applied to a PCollection of MutationGroup.
+ *
+ * Streaming Support
+ *
+ * {@link SpannerIO.Write} currently does not support unbounded writes 
hence should not be used
+ * as a sink for streaming pipelines.
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class SpannerIO {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146663)
Time Spent: 40m  (was: 0.5h)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146658
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 22/Sep/18 02:16
Start Date: 22/Sep/18 02:16
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6450: [BEAM-5445] 
Clarifies that SpannerIO currently does not support unbounded writes.
URL: https://github.com/apache/beam/pull/6450#issuecomment-423709925
 
 
   R: @udim 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146658)
Time Spent: 20m  (was: 10m)

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5445) Update SpannerIO to support unbounded writes

2018-09-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=146113&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146113
 ]

ASF GitHub Bot logged work on BEAM-5445:


Author: ASF GitHub Bot
Created on: 20/Sep/18 19:56
Start Date: 20/Sep/18 19:56
Worklog Time Spent: 10m 
  Work Description: chamikaramj opened a new pull request #6450: 
[BEAM-5445] Clarifies that SpannerIO currently does not support unbounded 
writes.
URL: https://github.com/apache/beam/pull/6450
 
 
   Clarifies that SpannerIO currently does not support unbounded writes.
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146113)
Time Spent: 10m
Remaining Estimate: 0h

> Update SpannerIO to support unbounded writes
> 
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Jayalath
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)