robertwb commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r527149063



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -125,6 +126,45 @@ public Void apply(Iterable<KV<String, Iterable<String>>> 
input) {
     pipeline.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testWithShardedKeyInGlobalWindow() {
+    // Since with default sharding, the number of subshards of of a key is 
nondeterministic, create

Review comment:
       Could we go with a larger batch size (say 5 or 10) and also verify that 
most batches are of the expected size? 

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
     }
   }
 
-  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>> {
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new 
BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
bounded Dataflow
+   * pipelines.
+   */
+  static class BatchGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
+
+    private final long batchSize;
+
+    private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+      return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
 
     private final DataflowRunner runner;
 
-    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+    StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner 
runner) {
       this.runner = runner;
     }
 
     @Override
-    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>>
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>>
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StreamingGroupIntoBatches(runner, transform.getTransform()));
+          new StreamingGroupIntoBatchesWithShardedKey<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
     }
 
     @Override
     public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, 
Iterable<V>>> newOutput) {
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the output in order to append required step properties during the graph 
translation.
    */
-  static class StreamingGroupIntoBatches<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
 
     private final transient DataflowRunner runner;
-    private final GroupIntoBatches<K, V> original;
+    private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+    private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
 
-    public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches<K, V> original) {
+    public StreamingGroupIntoBatchesWithShardedKey(
+        DataflowRunner runner,
+        GroupIntoBatches<K, V>.WithShardedKey original,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
       this.runner = runner;
-      this.original = original;
+      this.original_transform = original;
+      this.original_output = output;
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) 
{
-      runner.maybeRecordPCollectionWithAutoSharding(input);
-      return input.apply(original);
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      // Record the output PCollection of the original transform since the new 
output will be
+      // replaced by the original one when the replacement transform is wired 
to other nodes in the
+      // graph, although the old and the new outputs are effectively the same.
+      runner.maybeRecordPCollectionWithAutoSharding(original_output);
+      return input.apply(original_transform);
     }
   }
+
+  private static final long uuid = UUID.randomUUID().getMostSignificantBits();

Review comment:
       Call this workerUuid? 

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
     }
   }
 
-  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>> {
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new 
BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
bounded Dataflow
+   * pipelines.
+   */
+  static class BatchGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
+
+    private final long batchSize;
+
+    private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+      return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
 
     private final DataflowRunner runner;
 
-    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+    StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner 
runner) {
       this.runner = runner;
     }
 
     @Override
-    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>>
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>>
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StreamingGroupIntoBatches(runner, transform.getTransform()));
+          new StreamingGroupIntoBatchesWithShardedKey<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
     }
 
     @Override
     public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, 
Iterable<V>>> newOutput) {
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the output in order to append required step properties during the graph 
translation.
    */
-  static class StreamingGroupIntoBatches<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
 
     private final transient DataflowRunner runner;
-    private final GroupIntoBatches<K, V> original;
+    private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+    private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
 
-    public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches<K, V> original) {
+    public StreamingGroupIntoBatchesWithShardedKey(
+        DataflowRunner runner,
+        GroupIntoBatches<K, V>.WithShardedKey original,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
       this.runner = runner;
-      this.original = original;
+      this.original_transform = original;
+      this.original_output = output;
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) 
{
-      runner.maybeRecordPCollectionWithAutoSharding(input);
-      return input.apply(original);
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      // Record the output PCollection of the original transform since the new 
output will be
+      // replaced by the original one when the replacement transform is wired 
to other nodes in the
+      // graph, although the old and the new outputs are effectively the same.
+      runner.maybeRecordPCollectionWithAutoSharding(original_output);
+      return input.apply(original_transform);
     }
   }
+
+  private static final long uuid = UUID.randomUUID().getMostSignificantBits();

Review comment:
       Maybe take the sum of getMostSignificantBits() and 
getLeastSignificantBits()? 

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,9 +96,58 @@ public void process(ProcessContext c) {
     }
   }
 
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>

Review comment:
       Well, we currently give different overrides in the two cases for 
Dataflow. Everything works, but there are different performance characteristics 
that encourage different implementations. (One could argue that this is 
runner-specific logic.) Fine not to change now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to