This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c7855db34f3 [FLINK-39111][filesystem] Alter NativeS3AccessHelper
Interface to NativeS3ObjectOperations
c7855db34f3 is described below
commit c7855db34f317bb9bce7d01c006a05c8d3e58b5f
Author: Samrat <[email protected]>
AuthorDate: Wed Apr 22 19:04:48 2026 +0530
[FLINK-39111][filesystem] Alter NativeS3AccessHelper Interface to
NativeS3ObjectOperations
---
flink-filesystems/flink-s3-fs-native/README.md | 2 +-
.../flink/fs/s3native/NativeS3FileSystem.java | 22 +++++++++++-----------
.../fs/s3native/writer/NativeS3Committer.java | 7 ++++---
...ssHelper.java => NativeS3ObjectOperations.java} | 10 +++++-----
.../NativeS3RecoverableFsDataOutputStream.java | 8 ++++----
.../s3native/writer/NativeS3RecoverableWriter.java | 8 ++++----
6 files changed, 29 insertions(+), 28 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-native/README.md
b/flink-filesystems/flink-s3-fs-native/README.md
index 4e2bb103eba..7f6cfc56bed 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -343,7 +343,7 @@ Key classes:
- `NativeS3FileSystem` - Main FileSystem implementation
- `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads
- `S3ClientProvider` - Manages S3 client lifecycle
-- `NativeS3AccessHelper` - Low-level S3 operations
+- `NativeS3ObjectOperations` - Low-level S3 operations (multipart upload, put,
get, delete)
## Building
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 64795cc8c76..cb5026df2ac 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper;
+import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.StringUtils;
@@ -104,7 +104,7 @@ class NativeS3FileSystem extends FileSystem
@Nullable private final String entropyInjectionKey;
private final int entropyLength;
- @Nullable private final NativeS3AccessHelper s3AccessHelper;
+ @Nullable private final NativeS3ObjectOperations s3AccessHelper;
private final long s3uploadPartSize;
private final int maxConcurrentUploadsPerStream;
private final String localTmpDir;
@@ -139,7 +139,7 @@ class NativeS3FileSystem extends FileSystem
this.readBufferSize = readBufferSize;
this.fsCloseTimeout = fsCloseTimeout;
this.s3AccessHelper =
- new NativeS3AccessHelper(
+ new NativeS3ObjectOperations(
clientProvider.getS3Client(),
clientProvider.getTransferManager(),
bucketName,
@@ -194,7 +194,7 @@ class NativeS3FileSystem extends FileSystem
@Override
public FileStatus getFileStatus(Path path) throws IOException {
checkNotClosed();
- final String key = NativeS3AccessHelper.extractKey(path);
+ final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
LOG.debug("Getting file status for s3://{}/{}", bucketName, key);
@@ -293,7 +293,7 @@ class NativeS3FileSystem extends FileSystem
@Override
public FSDataInputStream open(Path path, int bufferSize) throws
IOException {
checkNotClosed();
- final String key = NativeS3AccessHelper.extractKey(path);
+ final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
final long fileSize = getFileStatus(path).getLen();
return new NativeS3InputStream(s3Client, bucketName, key, fileSize,
bufferSize);
@@ -302,7 +302,7 @@ class NativeS3FileSystem extends FileSystem
@Override
public FSDataInputStream open(Path path) throws IOException {
checkNotClosed();
- final String key = NativeS3AccessHelper.extractKey(path);
+ final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
final long fileSize = getFileStatus(path).getLen();
return new NativeS3InputStream(s3Client, bucketName, key, fileSize,
readBufferSize);
@@ -326,7 +326,7 @@ class NativeS3FileSystem extends FileSystem
@Override
public FileStatus[] listStatus(Path path) throws IOException {
checkNotClosed();
- String key = NativeS3AccessHelper.extractKey(path);
+ String key = NativeS3ObjectOperations.extractKey(path);
if (!key.isEmpty() && !key.endsWith("/")) {
key = key + "/";
}
@@ -373,7 +373,7 @@ class NativeS3FileSystem extends FileSystem
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
checkNotClosed();
- final String key = NativeS3AccessHelper.extractKey(path);
+ final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
try {
@@ -444,7 +444,7 @@ class NativeS3FileSystem extends FileSystem
}
}
- final String key = NativeS3AccessHelper.extractKey(path);
+ final String key = NativeS3ObjectOperations.extractKey(path);
return new NativeS3OutputStream(
clientProvider.getS3Client(),
bucketName,
@@ -463,8 +463,8 @@ class NativeS3FileSystem extends FileSystem
@Override
public boolean rename(Path src, Path dst) throws IOException {
checkNotClosed();
- final String srcKey = NativeS3AccessHelper.extractKey(src);
- final String dstKey = NativeS3AccessHelper.extractKey(dst);
+ final String srcKey = NativeS3ObjectOperations.extractKey(src);
+ final String dstKey = NativeS3ObjectOperations.extractKey(dst);
final S3Client s3Client = clientProvider.getS3Client();
final FileStatus srcStatus = getFileStatus(src);
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
index 2765740088a..7e93b4e766e 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java
@@ -41,10 +41,11 @@ import java.util.stream.Collectors;
*/
class NativeS3Committer implements RecoverableFsDataOutputStream.Committer {
- private final NativeS3AccessHelper s3AccessHelper;
+ private final NativeS3ObjectOperations s3AccessHelper;
private final NativeS3Recoverable recoverable;
- public NativeS3Committer(NativeS3AccessHelper s3AccessHelper,
NativeS3Recoverable recoverable) {
+ public NativeS3Committer(
+ NativeS3ObjectOperations s3AccessHelper, NativeS3Recoverable
recoverable) {
this.s3AccessHelper = s3AccessHelper;
this.recoverable = recoverable;
}
@@ -75,7 +76,7 @@ class NativeS3Committer implements
RecoverableFsDataOutputStream.Committer {
recoverable.parts().stream()
.map(
part ->
- new
NativeS3AccessHelper.UploadPartResult(
+ new
NativeS3ObjectOperations.UploadPartResult(
part.getPartNumber(),
part.getETag()))
.collect(Collectors.toList()),
recoverable.numBytesInParts());
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
similarity index 99%
rename from
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
rename to
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
index c86ff67e78e..01bc660f3ac 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
@@ -94,9 +94,9 @@ import java.util.stream.Collectors;
* https://bucket.s3.amazonaws.com/key}) are not currently supported.
*/
@Internal
-public class NativeS3AccessHelper {
+public class NativeS3ObjectOperations {
- private static final Logger LOG =
LoggerFactory.getLogger(NativeS3AccessHelper.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(NativeS3ObjectOperations.class);
private final S3Client s3Client;
private final S3TransferManager transferManager;
@@ -104,11 +104,11 @@ public class NativeS3AccessHelper {
private final boolean useAsyncOperations;
private final S3EncryptionConfig encryptionConfig;
- public NativeS3AccessHelper(S3Client s3Client, String bucketName) {
+ public NativeS3ObjectOperations(S3Client s3Client, String bucketName) {
this(s3Client, null, bucketName, false, null);
}
- public NativeS3AccessHelper(
+ public NativeS3ObjectOperations(
S3Client s3Client,
S3TransferManager transferManager,
String bucketName,
@@ -116,7 +116,7 @@ public class NativeS3AccessHelper {
this(s3Client, transferManager, bucketName, useAsyncOperations, null);
}
- public NativeS3AccessHelper(
+ public NativeS3ObjectOperations(
S3Client s3Client,
S3TransferManager transferManager,
String bucketName,
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
index c69ff909fe3..56e77291126 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java
@@ -59,7 +59,7 @@ class NativeS3RecoverableFsDataOutputStream extends
RecoverableFsDataOutputStrea
private static final int BUFFER_SIZE = 64 * 1024;
private final ReentrantLock lock = new ReentrantLock();
- private final NativeS3AccessHelper s3AccessHelper;
+ private final NativeS3ObjectOperations s3AccessHelper;
private final String key;
private final String uploadId;
private final String localTmpDir;
@@ -77,7 +77,7 @@ class NativeS3RecoverableFsDataOutputStream extends
RecoverableFsDataOutputStrea
private volatile boolean closed;
public NativeS3RecoverableFsDataOutputStream(
- NativeS3AccessHelper s3AccessHelper,
+ NativeS3ObjectOperations s3AccessHelper,
String key,
String uploadId,
String localTmpDir,
@@ -87,7 +87,7 @@ class NativeS3RecoverableFsDataOutputStream extends
RecoverableFsDataOutputStrea
}
public NativeS3RecoverableFsDataOutputStream(
- NativeS3AccessHelper s3AccessHelper,
+ NativeS3ObjectOperations s3AccessHelper,
String key,
String uploadId,
String localTmpDir,
@@ -182,7 +182,7 @@ class NativeS3RecoverableFsDataOutputStream extends
RecoverableFsDataOutputStrea
currentOutputStream.close();
int partNumber = nextPartNumber++;
- NativeS3AccessHelper.UploadPartResult result =
+ NativeS3ObjectOperations.UploadPartResult result =
s3AccessHelper.uploadPart(
key, uploadId, partNumber, currentTempFile,
currentPartSize);
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
index 5aeedd12601..942b1d3e80b 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java
@@ -38,14 +38,14 @@ public class NativeS3RecoverableWriter implements
RecoverableWriter, AutoCloseab
private static final Logger LOG =
LoggerFactory.getLogger(NativeS3RecoverableWriter.class);
- private final NativeS3AccessHelper s3AccessHelper;
+ private final NativeS3ObjectOperations s3AccessHelper;
private final String localTmpDir;
private final long userDefinedMinPartSize;
private final int maxConcurrentUploadsPerStream;
private final AtomicBoolean closed = new AtomicBoolean(false);
private NativeS3RecoverableWriter(
- NativeS3AccessHelper s3AccessHelper,
+ NativeS3ObjectOperations s3AccessHelper,
String localTmpDir,
long userDefinedMinPartSize,
int maxConcurrentUploadsPerStream) {
@@ -63,7 +63,7 @@ public class NativeS3RecoverableWriter implements
RecoverableWriter, AutoCloseab
@Override
public RecoverableFsDataOutputStream open(Path path) throws IOException {
checkNotClosed();
- String key = NativeS3AccessHelper.extractKey(path);
+ String key = NativeS3ObjectOperations.extractKey(path);
LOG.debug("Opening recoverable stream for key: {}", key);
String uploadId = s3AccessHelper.startMultiPartUpload(key);
@@ -159,7 +159,7 @@ public class NativeS3RecoverableWriter implements
RecoverableWriter, AutoCloseab
}
public static NativeS3RecoverableWriter writer(
- NativeS3AccessHelper s3AccessHelper,
+ NativeS3ObjectOperations s3AccessHelper,
String localTmpDir,
long userDefinedMinPartSize,
int maxConcurrentUploadsPerStream) {