This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7855db34f3 [FLINK-39111][filesystem] Alter NativeS3AccessHelper 
Interface to NativeS3ObjectOperations
c7855db34f3 is described below

commit c7855db34f317bb9bce7d01c006a05c8d3e58b5f
Author: Samrat <[email protected]>
AuthorDate: Wed Apr 22 19:04:48 2026 +0530

    [FLINK-39111][filesystem] Alter NativeS3AccessHelper Interface to 
NativeS3ObjectOperations
---
 flink-filesystems/flink-s3-fs-native/README.md     |  2 +-
 .../flink/fs/s3native/NativeS3FileSystem.java      | 22 +++++++++++-----------
 .../fs/s3native/writer/NativeS3Committer.java      |  7 ++++---
 ...ssHelper.java => NativeS3ObjectOperations.java} | 10 +++++-----
 .../NativeS3RecoverableFsDataOutputStream.java     |  8 ++++----
 .../s3native/writer/NativeS3RecoverableWriter.java |  8 ++++----
 6 files changed, 29 insertions(+), 28 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-native/README.md 
b/flink-filesystems/flink-s3-fs-native/README.md
index 4e2bb103eba..7f6cfc56bed 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -343,7 +343,7 @@ Key classes:
 - `NativeS3FileSystem` - Main FileSystem implementation
 - `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads
 - `S3ClientProvider` - Manages S3 client lifecycle
-- `NativeS3AccessHelper` - Low-level S3 operations
+- `NativeS3ObjectOperations` - Low-level S3 operations (multipart upload, put, 
get, delete)
 
 ## Building
 
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 64795cc8c76..cb5026df2ac 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper;
+import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
 import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.StringUtils;
@@ -104,7 +104,7 @@ class NativeS3FileSystem extends FileSystem
     @Nullable private final String entropyInjectionKey;
     private final int entropyLength;
 
-    @Nullable private final NativeS3AccessHelper s3AccessHelper;
+    @Nullable private final NativeS3ObjectOperations s3AccessHelper;
     private final long s3uploadPartSize;
     private final int maxConcurrentUploadsPerStream;
     private final String localTmpDir;
@@ -139,7 +139,7 @@ class NativeS3FileSystem extends FileSystem
         this.readBufferSize = readBufferSize;
         this.fsCloseTimeout = fsCloseTimeout;
         this.s3AccessHelper =
-                new NativeS3AccessHelper(
+                new NativeS3ObjectOperations(
                         clientProvider.getS3Client(),
                         clientProvider.getTransferManager(),
                         bucketName,
@@ -194,7 +194,7 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
         checkNotClosed();
-        final String key = NativeS3AccessHelper.extractKey(path);
+        final String key = NativeS3ObjectOperations.extractKey(path);
         final S3Client s3Client = clientProvider.getS3Client();
 
         LOG.debug("Getting file status for s3://{}/{}", bucketName, key);
@@ -293,7 +293,7 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public FSDataInputStream open(Path path, int bufferSize) throws 
IOException {
         checkNotClosed();
-        final String key = NativeS3AccessHelper.extractKey(path);
+        final String key = NativeS3ObjectOperations.extractKey(path);
         final S3Client s3Client = clientProvider.getS3Client();
         final long fileSize = getFileStatus(path).getLen();
         return new NativeS3InputStream(s3Client, bucketName, key, fileSize, 
bufferSize);
@@ -302,7 +302,7 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public FSDataInputStream open(Path path) throws IOException {
         checkNotClosed();
-        final String key = NativeS3AccessHelper.extractKey(path);
+        final String key = NativeS3ObjectOperations.extractKey(path);
         final S3Client s3Client = clientProvider.getS3Client();
         final long fileSize = getFileStatus(path).getLen();
         return new NativeS3InputStream(s3Client, bucketName, key, fileSize, 
readBufferSize);
@@ -326,7 +326,7 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
         checkNotClosed();
-        String key = NativeS3AccessHelper.extractKey(path);
+        String key = NativeS3ObjectOperations.extractKey(path);
         if (!key.isEmpty() && !key.endsWith("/")) {
             key = key + "/";
         }
@@ -373,7 +373,7 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public boolean delete(Path path, boolean recursive) throws IOException {
         checkNotClosed();
-        final String key = NativeS3AccessHelper.extractKey(path);
+        final String key = NativeS3ObjectOperations.extractKey(path);
         final S3Client s3Client = clientProvider.getS3Client();
 
         try {
@@ -444,7 +444,7 @@ class NativeS3FileSystem extends FileSystem
             }
         }
 
-        final String key = NativeS3AccessHelper.extractKey(path);
+        final String key = NativeS3ObjectOperations.extractKey(path);
         return new NativeS3OutputStream(
                 clientProvider.getS3Client(),
                 bucketName,
@@ -463,8 +463,8 @@ class NativeS3FileSystem extends FileSystem
     @Override
     public boolean rename(Path src, Path dst) throws IOException {
         checkNotClosed();
-        final String srcKey = NativeS3AccessHelper.extractKey(src);
-        final String dstKey = NativeS3AccessHelper.extractKey(dst);
+        final String srcKey = NativeS3ObjectOperations.extractKey(src);
+        final String dstKey = NativeS3ObjectOperations.extractKey(dst);
         final S3Client s3Client = clientProvider.getS3Client();
 
         final FileStatus srcStatus = getFileStatus(src);
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
index 2765740088a..7e93b4e766e 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
@@ -41,10 +41,11 @@ import java.util.stream.Collectors;
  */
 class NativeS3Committer implements RecoverableFsDataOutputStream.Committer {
 
-    private final NativeS3AccessHelper s3AccessHelper;
+    private final NativeS3ObjectOperations s3AccessHelper;
     private final NativeS3Recoverable recoverable;
 
-    public NativeS3Committer(NativeS3AccessHelper s3AccessHelper, 
NativeS3Recoverable recoverable) {
+    public NativeS3Committer(
+            NativeS3ObjectOperations s3AccessHelper, NativeS3Recoverable 
recoverable) {
         this.s3AccessHelper = s3AccessHelper;
         this.recoverable = recoverable;
     }
@@ -75,7 +76,7 @@ class NativeS3Committer implements 
RecoverableFsDataOutputStream.Committer {
                 recoverable.parts().stream()
                         .map(
                                 part ->
-                                        new 
NativeS3AccessHelper.UploadPartResult(
+                                        new 
NativeS3ObjectOperations.UploadPartResult(
                                                 part.getPartNumber(), 
part.getETag()))
                         .collect(Collectors.toList()),
                 recoverable.numBytesInParts());
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
similarity index 99%
rename from 
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
rename to 
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
index c86ff67e78e..01bc660f3ac 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
@@ -94,9 +94,9 @@ import java.util.stream.Collectors;
  * https://bucket.s3.amazonaws.com/key}) are not currently supported.
  */
 @Internal
-public class NativeS3AccessHelper {
+public class NativeS3ObjectOperations {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3AccessHelper.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3ObjectOperations.class);
 
     private final S3Client s3Client;
     private final S3TransferManager transferManager;
@@ -104,11 +104,11 @@ public class NativeS3AccessHelper {
     private final boolean useAsyncOperations;
     private final S3EncryptionConfig encryptionConfig;
 
-    public NativeS3AccessHelper(S3Client s3Client, String bucketName) {
+    public NativeS3ObjectOperations(S3Client s3Client, String bucketName) {
         this(s3Client, null, bucketName, false, null);
     }
 
-    public NativeS3AccessHelper(
+    public NativeS3ObjectOperations(
             S3Client s3Client,
             S3TransferManager transferManager,
             String bucketName,
@@ -116,7 +116,7 @@ public class NativeS3AccessHelper {
         this(s3Client, transferManager, bucketName, useAsyncOperations, null);
     }
 
-    public NativeS3AccessHelper(
+    public NativeS3ObjectOperations(
             S3Client s3Client,
             S3TransferManager transferManager,
             String bucketName,
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
index c69ff909fe3..56e77291126 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
@@ -59,7 +59,7 @@ class NativeS3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStrea
     private static final int BUFFER_SIZE = 64 * 1024;
     private final ReentrantLock lock = new ReentrantLock();
 
-    private final NativeS3AccessHelper s3AccessHelper;
+    private final NativeS3ObjectOperations s3AccessHelper;
     private final String key;
     private final String uploadId;
     private final String localTmpDir;
@@ -77,7 +77,7 @@ class NativeS3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStrea
     private volatile boolean closed;
 
     public NativeS3RecoverableFsDataOutputStream(
-            NativeS3AccessHelper s3AccessHelper,
+            NativeS3ObjectOperations s3AccessHelper,
             String key,
             String uploadId,
             String localTmpDir,
@@ -87,7 +87,7 @@ class NativeS3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStrea
     }
 
     public NativeS3RecoverableFsDataOutputStream(
-            NativeS3AccessHelper s3AccessHelper,
+            NativeS3ObjectOperations s3AccessHelper,
             String key,
             String uploadId,
             String localTmpDir,
@@ -182,7 +182,7 @@ class NativeS3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStrea
         currentOutputStream.close();
 
         int partNumber = nextPartNumber++;
-        NativeS3AccessHelper.UploadPartResult result =
+        NativeS3ObjectOperations.UploadPartResult result =
                 s3AccessHelper.uploadPart(
                         key, uploadId, partNumber, currentTempFile, 
currentPartSize);
 
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
index 5aeedd12601..942b1d3e80b 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
@@ -38,14 +38,14 @@ public class NativeS3RecoverableWriter implements 
RecoverableWriter, AutoCloseab
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3RecoverableWriter.class);
 
-    private final NativeS3AccessHelper s3AccessHelper;
+    private final NativeS3ObjectOperations s3AccessHelper;
     private final String localTmpDir;
     private final long userDefinedMinPartSize;
     private final int maxConcurrentUploadsPerStream;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private NativeS3RecoverableWriter(
-            NativeS3AccessHelper s3AccessHelper,
+            NativeS3ObjectOperations s3AccessHelper,
             String localTmpDir,
             long userDefinedMinPartSize,
             int maxConcurrentUploadsPerStream) {
@@ -63,7 +63,7 @@ public class NativeS3RecoverableWriter implements 
RecoverableWriter, AutoCloseab
     @Override
     public RecoverableFsDataOutputStream open(Path path) throws IOException {
         checkNotClosed();
-        String key = NativeS3AccessHelper.extractKey(path);
+        String key = NativeS3ObjectOperations.extractKey(path);
         LOG.debug("Opening recoverable stream for key: {}", key);
 
         String uploadId = s3AccessHelper.startMultiPartUpload(key);
@@ -159,7 +159,7 @@ public class NativeS3RecoverableWriter implements 
RecoverableWriter, AutoCloseab
     }
 
     public static NativeS3RecoverableWriter writer(
-            NativeS3AccessHelper s3AccessHelper,
+            NativeS3ObjectOperations s3AccessHelper,
             String localTmpDir,
             long userDefinedMinPartSize,
             int maxConcurrentUploadsPerStream) {

Reply via email to