[ 
https://issues.apache.org/jira/browse/BEAM-10703?focusedWorklogId=514456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-514456
 ]

ASF GitHub Bot logged work on BEAM-10703:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Nov/20 05:29
            Start Date: 20/Nov/20 05:29
    Worklog Time Spent: 10m 
      Work Description: nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r527410874



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
     }
   }
 
-  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>> {
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new 
BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
bounded Dataflow
+   * pipelines.
+   */
+  static class BatchGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
+
+    private final long batchSize;
+
+    private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+      return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
 
     private final DataflowRunner runner;
 
-    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+    StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner 
runner) {
       this.runner = runner;
     }
 
     @Override
-    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>>
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>>
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StreamingGroupIntoBatches(runner, transform.getTransform()));
+          new StreamingGroupIntoBatchesWithShardedKey<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
     }
 
     @Override
     public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, 
Iterable<V>>> newOutput) {
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the output in order to append required step properties during the graph 
translation.
    */
-  static class StreamingGroupIntoBatches<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
 
     private final transient DataflowRunner runner;
-    private final GroupIntoBatches<K, V> original;
+    private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+    private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
 
-    public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches<K, V> original) {
+    public StreamingGroupIntoBatchesWithShardedKey(
+        DataflowRunner runner,
+        GroupIntoBatches<K, V>.WithShardedKey original,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
       this.runner = runner;
-      this.original = original;
+      this.original_transform = original;
+      this.original_output = output;
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) 
{
-      runner.maybeRecordPCollectionWithAutoSharding(input);
-      return input.apply(original);
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      // Record the output PCollection of the original transform since the new 
output will be
+      // replaced by the original one when the replacement transform is wired 
to other nodes in the
+      // graph, although the old and the new outputs are effectively the same.
+      runner.maybeRecordPCollectionWithAutoSharding(original_output);
+      return input.apply(original_transform);
     }
   }
+
+  private static final long uuid = UUID.randomUUID().getMostSignificantBits();

Review comment:
       I changed the shard id to be the concatenation of the most and the least 
significant bits. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 514456)
    Time Spent: 11h 10m  (was: 11h)

> Add support for auto-sharded GroupIntoBatches in Dataflow runner
> ----------------------------------------------------------------
>
>                 Key: BEAM-10703
>                 URL: https://issues.apache.org/jira/browse/BEAM-10703
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P1
>          Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> The proposal of improving GroupIntoBatches transform is in BEAM-10475
> This tracks the support in Cloud Dataflow Runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to