This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b31886d0c06caf2fe62b4687ca3819d2d27d302
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu May 28 10:49:12 2026 +0300

    [FLINK-39811][s3] Optionally Adjust s5cmd part size to fully ustilize s5cmd 
workers pool during downloads
    
    Unlike the upload path (recoverableWriter) which sets 
S3_MULTIPART_MIN_PART_SIZE of 5 MiB[1], we don't set any part size for s5cmd so 
downloads part size fall back to its default of 50MiB[2].
    
    As a result, the effective concurrency is capped to batchSize / 50MiB, 
underutilizing the worker pool for downloaded batchSizes below (50*5 MiB) (5 
being s5cmd default concurrency[3])
    
    s3 sdk source ref[4] showing N(=concurrency) channels are initialized but 
only (batchsize/partsize) of them are assigned a range to read.
    
    
[1]https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java\#L93
    
[2]https://github.com/peak/s5cmd/blob/54d6a8a955688f07e5acc40d61f9c42ceac6c33b/command/cp.go\#L30
    
[3]https://github.com/peak/s5cmd/blob/54d6a8a955688f07e5acc40d61f9c42ceac6c33b/command/cp.go\#L29
    
[4]https://github.com/aws/aws-sdk-go/blob/070853e88d22854d2355c2543d0958a5f76ad407/service/s3/s3manager/download.go\#L329-L338
---
 .../fs/s3/common/AbstractS3FileSystemFactory.java  | 13 ++++
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 39 ++++++++++--
 .../flink/fs/s3/common/FlinkS3FileSystemTest.java  | 73 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 4 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 4a6846a4df2..c96e7109694 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -90,6 +90,19 @@ public abstract class AbstractS3FileSystemFactory implements 
FileSystemFactory {
                     .defaultValue(100)
                     .withDescription("Maximum number of files to download per 
one call to s5cmd");
 
+    public static final ConfigOption<Boolean> S5CMD_ADJUST_PART_SIZE =
+            ConfigOptions.key("s3.s5cmd.adjust-part-size")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When set to true, s5cmd will dynamically adjust 
the part size for copy commands to fully utilize all "
+                                    + 
FlinkS3FileSystem.DEFAULT_S5CMD_CONCURRENCY
+                                    + " channels. "
+                                    + "A smaller part size will improve 
operations speed but might incur in increased S3 request costs. "
+                                    + "When set to false, part size will 
default to "
+                                    + 
FlinkS3FileSystem.DEFAULT_S5CMD_PART_SIZE_MB
+                                    + " MiB.");
+
     public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
             ConfigOptions.key("s3.upload.min.part.size")
                     .longType()
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index c8888690c5e..f2a7cca5104 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_ADJUST_PART_SIZE;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
@@ -92,6 +93,12 @@ public class FlinkS3FileSystem extends HadoopFileSystem
     /** The minimum size of a part in the multipart upload, except for the 
last part: 5 MIBytes. */
     public static final long S3_MULTIPART_MIN_PART_SIZE = 5L << 20;
 
+    /** The maximum allowed part size by AWS: 5 GIBytes. */
+    public static final long S3_MULTIPART_MAX_PART_SIZE = 5L << 30;
+
+    public static final long DEFAULT_S5CMD_PART_SIZE_MB = 50;
+    public static final long DEFAULT_S5CMD_CONCURRENCY = 5;
+
     private final String localTmpDir;
 
     private final FunctionWithException<File, RefCountedFileWithStream, 
IOException> tmpFileCreator;
@@ -113,6 +120,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem
         @Nullable private final String accessArtifact;
         @Nullable private final String secretArtifact;
         @Nullable private final String endpoint;
+        private final boolean adjustPartSize;
         private long maxBatchSizeFiles;
         private long maxBatchSizeBytes;
 
@@ -124,7 +132,8 @@ public class FlinkS3FileSystem extends HadoopFileSystem
                 @Nullable String secretArtifact,
                 @Nullable String endpoint,
                 int maxBatchSizeFiles,
-                long maxBatchSizeBytes) {
+                long maxBatchSizeBytes,
+                boolean adjustPartSize) {
             if (!path.isEmpty()) {
                 File s5CmdFile = new File(path);
                 checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary 
under [%s]", path);
@@ -138,6 +147,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem
             this.endpoint = endpoint;
             this.maxBatchSizeFiles = maxBatchSizeFiles;
             this.maxBatchSizeBytes = maxBatchSizeBytes;
+            this.adjustPartSize = adjustPartSize;
         }
 
         public static Optional<S5CmdConfiguration> of(Configuration 
flinkConfig) {
@@ -152,7 +162,8 @@ public class FlinkS3FileSystem extends HadoopFileSystem
                                             flinkConfig.get(SECRET_KEY),
                                             flinkConfig.get(ENDPOINT),
                                             
flinkConfig.get(S5CMD_BATCH_MAX_FILES),
-                                            
flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes()));
+                                            
flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes(),
+                                            
flinkConfig.get(S5CMD_ADJUST_PART_SIZE)));
         }
 
         private void configureEnvironment(Map<String, String> environment) {
@@ -204,6 +215,12 @@ public class FlinkS3FileSystem extends HadoopFileSystem
                     + ", endpoint='"
                     + endpoint
                     + '\''
+                    + ", maxBatchSizeFiles="
+                    + maxBatchSizeFiles
+                    + ", maxBatchSizeBytes="
+                    + maxBatchSizeBytes
+                    + ", adjustPartSize="
+                    + adjustPartSize
                     + '}';
         }
     }
@@ -301,13 +318,27 @@ public class FlinkS3FileSystem extends HadoopFileSystem
         }
     }
 
+    private long partSizeFrom(long fileSizeBytes) {
+        return Math.max(
+                        S3_MULTIPART_MIN_PART_SIZE,
+                        Math.min(
+                                S3_MULTIPART_MAX_PART_SIZE,
+                                fileSizeBytes / DEFAULT_S5CMD_CONCURRENCY))
+                / (1L << 20);
+    }
+
     private List<String> convertToSpells(List<CopyRequest> requests) throws 
IOException {
         List<String> spells = new ArrayList<>();
         for (CopyRequest request : requests) {
             
Files.createDirectories(Paths.get(request.getDestination().toUri()).getParent());
+            final long partSize =
+                    s5CmdConfiguration.adjustPartSize
+                            ? partSizeFrom(request.getSize())
+                            : DEFAULT_S5CMD_PART_SIZE_MB;
             spells.add(
                     String.format(
-                            "cp %s %s",
+                            "cp --part-size %s %s %s",
+                            partSize,
                             request.getSource().toUri().toString(),
                             request.getDestination().getPath()));
         }
@@ -320,7 +351,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem
         int exitCode = 0;
         final AtomicReference<IOException> maybeCloseableRegistryException =
                 new AtomicReference<>();
-
+        LOG.debug("Casting spells: {}", spells);
         // Setup temporary working directory for the process
         File tmpWorkingDir = new File(localTmpDir, "s5cmd_" + 
UUID.randomUUID());
         java.nio.file.Path tmpWorkingPath = 
Files.createDirectories(tmpWorkingDir.toPath());
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
index 05c43f0e78b..4a47e908803 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
@@ -33,6 +33,8 @@ import org.junit.jupiter.api.condition.EnabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nonnull;
@@ -53,6 +55,7 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_ADJUST_PART_SIZE;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
 import static 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
@@ -185,4 +188,74 @@ class FlinkS3FileSystemTest {
                         totalSubcommands, tasks)
                 .isEqualTo(numFiles);
     }
+
+    private static List<Arguments> partSizeTestArguments() {
+        return List.of(
+                Arguments.of(true, 123L * (1L << 20), 24L),
+                Arguments.of(
+                        true,
+                        25_605L * (1L << 20),
+                        FlinkS3FileSystem.S3_MULTIPART_MAX_PART_SIZE / (1L << 
20)),
+                Arguments.of(
+                        true,
+                        25_600L * (1L << 20),
+                        FlinkS3FileSystem.S3_MULTIPART_MAX_PART_SIZE / (1L << 
20)),
+                Arguments.of(
+                        true,
+                        10L * (1L << 20),
+                        FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE / (1L << 
20)),
+                Arguments.of(
+                        false, 123L * (1L << 20), 
FlinkS3FileSystem.DEFAULT_S5CMD_PART_SIZE_MB));
+    }
+
+    @ParameterizedTest
+    @MethodSource("partSizeTestArguments")
+    @EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script
+    public void testCopyUsesPartSize(
+            boolean adjustPartSize,
+            long copyRequestSize,
+            long expectedPartSize,
+            @TempDir File temporaryDirectory)
+            throws Exception {
+        File cmdFile = new File(temporaryDirectory, "cmd");
+        File inputToCmd = new File(temporaryDirectory, "input");
+        Preconditions.checkState(inputToCmd.mkdir());
+
+        String cmd =
+                String.format(
+                        "file=$(mktemp %s/s5cmd-input-XXX)\n"
+                                + "while read line; do echo $line >> $file; 
done < /dev/stdin",
+                        inputToCmd.getAbsolutePath());
+
+        FileUtils.writeStringToFile(cmdFile, cmd);
+        Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set 
script file executable.");
+
+        final Configuration conf = new Configuration();
+        conf.set(S5CMD_PATH, cmdFile.getAbsolutePath());
+        conf.set(S5CMD_EXTRA_ARGS, "");
+        conf.set(S5CMD_ADJUST_PART_SIZE, adjustPartSize);
+        conf.set(ACCESS_KEY, "test-access-key");
+        conf.set(SECRET_KEY, "test-secret-key");
+        conf.set(ENDPOINT, "test-endpoint");
+
+        TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
+        factory.configure(conf);
+
+        FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new 
URI("s3://test"));
+        List<CopyRequest> tasks =
+                Collections.singletonList(
+                        CopyRequest.of(
+                                new Path("file:///src-file"),
+                                new Path("file:///dst-file"),
+                                copyRequestSize));
+        fs.copyFiles(tasks, ICloseableRegistry.NO_OP);
+        File[] files = inputToCmd.listFiles();
+        Assertions.assertThat(files).isNotNull().hasSize(1);
+        List<String> subcommands = FileUtils.readLines(files[0], 
StandardCharsets.UTF_8);
+        Assertions.assertThat(subcommands).hasSize(1);
+        String command = subcommands.get(0);
+        Assertions.assertThat(command)
+                .describedAs("s5cmd command should contain --part-size %d", 
expectedPartSize)
+                .contains("--part-size " + expectedPartSize);
+    }
 }

Reply via email to