This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b3ce095b0a Reduce the maximum size of input splits in Flink to better 
distribute work (#28045)
6b3ce095b0a is described below

commit 6b3ce095b0a12a8385eacdb1748fb523247850c1
Author: Julien Tournay <boudhe...@gmail.com>
AuthorDate: Fri Sep 1 22:48:25 2023 +0200

    Reduce the maximum size of input splits in Flink to better distribute work 
(#28045)
---
 .../beam/runners/flink/FlinkPipelineOptions.java   |  7 +++++++
 .../translation/wrappers/SourceInputFormat.java    | 22 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 2f32ab2c2ea..1e01514fe8b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -313,6 +313,13 @@ public interface FlinkPipelineOptions
 
   void setFlinkConfDir(String confDir);
 
+  @Description(
+      "Set the maximum size of input split when data is read from a 
filesystem. 0 implies no max size.")
+  @Default.Long(0)
+  Long getFileInputSplitMaxSizeMB();
+
+  void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);
+
   static FlinkPipelineOptions defaults() {
     return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 7e1835eac72..a1b8bced7a1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.flink.translation.wrappers;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -108,12 +110,30 @@ public class SourceInputFormat<T> extends 
RichInputFormat<WindowedValue<T>, Sour
     return null;
   }
 
+  private long getDesiredSizeBytes(int numSplits) throws Exception {
+    long totalSize = initialSource.getEstimatedSizeBytes(options);
+    long defaultSplitSize = totalSize / numSplits;
+    long maxSplitSize = 0;
+    if (options != null) {
+      maxSplitSize = 
options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
+    }
+    if (initialSource instanceof FileBasedSource && maxSplitSize > 0) {
+      // Most of the time parallelism is < number of files in source.
+      // Each file becomes a unique split which commonly create skew.
+      // This limits the size of splits to reduce skew.
+      return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024);
+    } else {
+      return defaultSplitSize;
+    }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public SourceInputSplit<T>[] createInputSplits(int numSplits) throws 
IOException {
     try {
-      long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / 
numSplits;
+      long desiredSizeBytes = getDesiredSizeBytes(numSplits);
       List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, 
options);
+
       int numShards = shards.size();
       SourceInputSplit<T>[] sourceInputSplits = new 
SourceInputSplit[numShards];
       for (int i = 0; i < numShards; i++) {

Reply via email to