[ 
https://issues.apache.org/jira/browse/BEAM-9748?focusedWorklogId=435753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435753
 ]

ASF GitHub Bot logged work on BEAM-9748:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/20 23:05
            Start Date: 20/May/20 23:05
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #11406:
URL: https://github.com/apache/beam/pull/11406#discussion_r428356257



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;
+
     private ViaRandomKey() {}
 
+    /**
+     * Use a different strategy that materializes the input and prepares it to 
be consumed in a
+     * highly parallel fashion.
+     *
+     * <p>It is tailored to the case when input was produced in an extremely 
sequential way -
+     * typically by a ParDo that emits millions of outputs _per input 
element_, e.g., executing a
+     * large database query or a large simulation and emitting all of their 
results.
+     *
+     * <p>Internally, it materializes the input at a moderate cost before 
reshuffling it, making the
+     * reshuffling itself significantly cheaper in these extreme cases on some 
runners. Use this
+     * only if your benchmarks show an improvement.
+     */
+    public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+      this.isHighFanoutAndLimitedInputParallelism = true;
+      return this;
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
+      if (isHighFanoutAndLimitedInputParallelism) {
+        // See https://issues.apache.org/jira/browse/BEAM-2803
+        // We use a combined approach to "break fusion" here:
+        // (see 
https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
+        // 1) force the data to be materialized by passing it as a side input 
to an identity fn,
+        // then 2) reshuffle it with a random key. Initial materialization 
provides some parallelism
+        // and ensures that data to be shuffled can be generated in parallel, 
while reshuffling
+        // provides perfect parallelism.
+        // In most cases where a "fusion break" is needed, a simple reshuffle 
would be sufficient.
+        // The current approach is necessary only to support the particular 
case of JdbcIO where
+        // a single query may produce many gigabytes of query results.
+        PCollectionView<Iterable<T>> empty =
+            input
+                .apply("Consume", 
Filter.by(SerializableFunctions.constant(false)))
+                .apply(View.asIterable());
+        PCollection<T> materialized =
+            input.apply(
+                "Identity",
+                ParDo.of(

Review comment:
       nit: here you could use a `MapElements.via(Contextful.of(t -> t, 
requiresSideInputs(empty)))`

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;

Review comment:
       Please use an immutable AutoValue with a builder here. PTransforms can't 
have mutable member variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 435753)
    Time Spent: 4h 40m  (was: 4.5h)

> Refactor Reparallelize as an alternative Reshuffle implementation
> -----------------------------------------------------------------
>
>                 Key: BEAM-9748
>                 URL: https://issues.apache.org/jira/browse/BEAM-9748
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: P3
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Some DoFn based IOs like JdbcIO and RedisIO rely on a different approach to 
> Reparallelize outputs using a combination of a an empty PCollectionView to 
> force materialization and Reshuffle.viaRandomkey to reparallelize a 
> PCollection. This issue extracts this transform and expose it as part of the 
> Reshuffle to avoid repeating the code for transforms (notably IOs) that 
> produce lots of sequentially generated data where and benefit of this 
> alternative approach to perform better reparallelization of its output.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to