This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3514c52d7539dedac30df2d42fdc05b79f0e4551
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Nov 21 11:11:10 2018 +0100

    [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
---
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |  6 ++---
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 12 ++++-----
 .../writer/RecoverableMultiPartUploadImpl.java     | 30 +++++++++++-----------
 ...3MultiPartUploader.java => S3AccessHelper.java} |  2 +-
 .../flink/fs/s3/common/writer/S3Committer.java     | 12 ++++-----
 .../S3RecoverableMultipartUploadFactory.java       | 10 ++++----
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  4 +--
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |  4 +--
 .../writer/RecoverableMultiPartUploadImplTest.java |  4 +--
 ...PartUploader.java => HadoopS3AccessHelper.java} | 26 +++++++++----------
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |  6 ++---
 .../flink/fs/s3presto/S3FileSystemFactory.java     |  4 +--
 12 files changed, 60 insertions(+), 60 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 318fd39..6ccdeae 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ public abstract class AbstractS3FileSystemFactory 
implements FileSystemFactory {
                        final String localTmpDirectory = 
flinkConfig.getString(CoreOptions.TMP_DIRS);
                        final long s3minPartSize = 
flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
                        final int maxConcurrentUploads = 
flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
-                       final S3MultiPartUploader s3AccessHelper = 
getS3AccessHelper(fs);
+                       final S3AccessHelper s3AccessHelper = 
getS3AccessHelper(fs);
 
                        return new FlinkS3FileSystem(
                                        fs,
@@ -166,6 +166,6 @@ public abstract class AbstractS3FileSystemFactory 
implements FileSystemFactory {
                URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
 
        @Nullable
-       protected abstract S3MultiPartUploader 
getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
+       protected abstract S3AccessHelper 
getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
 }
 
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 553edde..5248e06 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
        private final FunctionWithException<File, RefCountedFile, IOException> 
tmpFileCreator;
 
        @Nullable
-       private final S3MultiPartUploader s3UploadHelper;
+       private final S3AccessHelper s3AccessHelper;
 
        private final Executor uploadThreadPool;
 
@@ -83,7 +83,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
                        String localTmpDirectory,
                        @Nullable String entropyInjectionKey,
                        int entropyLength,
-                       @Nullable S3MultiPartUploader s3UploadHelper,
+                       @Nullable S3AccessHelper s3UploadHelper,
                        long s3uploadPartSize,
                        int maxConcurrentUploadsPerStream) {
 
@@ -99,7 +99,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
                // recoverable writer parameter configuration initialization
                this.localTmpDir = 
Preconditions.checkNotNull(localTmpDirectory);
                this.tmpFileCreator = 
RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
-               this.s3UploadHelper = s3UploadHelper;
+               this.s3AccessHelper = s3UploadHelper;
                this.uploadThreadPool = Executors.newCachedThreadPool();
 
                Preconditions.checkArgument(s3uploadPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
@@ -131,7 +131,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
 
        @Override
        public RecoverableWriter createRecoverableWriter() throws IOException {
-               if (s3UploadHelper == null) {
+               if (s3AccessHelper == null) {
                        // this is the case for Presto
                        throw new UnsupportedOperationException("This s3 file 
system implementation does not support recoverable writers.");
                }
@@ -139,7 +139,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
                return S3RecoverableWriter.writer(
                                getHadoopFileSystem(),
                                tmpFileCreator,
-                               s3UploadHelper,
+                               s3AccessHelper,
                                uploadThreadPool,
                                s3uploadPartSize,
                                maxConcurrentUploadsPerStream);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 80042ce..fe2a4cd 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @NotThreadSafe
 final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload {
 
-       private final S3MultiPartUploader s3MPUploader;
+       private final S3AccessHelper s3AccessHelper;
 
        private final Executor uploadThreadPool;
 
@@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
        // 
------------------------------------------------------------------------
 
        private RecoverableMultiPartUploadImpl(
-                       S3MultiPartUploader s3uploader,
+                       S3AccessHelper s3AccessHelper,
                        Executor uploadThreadPool,
                        String uploadId,
                        String objectName,
@@ -81,7 +81,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
        ) {
                checkArgument(numBytes >= 0L);
 
-               this.s3MPUploader = checkNotNull(s3uploader);
+               this.s3AccessHelper = checkNotNull(s3AccessHelper);
                this.uploadThreadPool = checkNotNull(uploadThreadPool);
                this.currentUploadInfo = new MultiPartUploadInfo(objectName, 
uploadId, partsSoFar, numBytes, incompletePart);
                this.namePrefixForTempObjects = 
incompleteObjectNamePrefix(objectName);
@@ -111,7 +111,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                currentUploadInfo.registerNewPart(partLength);
 
                file.retain(); // keep the file while the async upload still 
runs
-               uploadThreadPool.execute(new UploadTask(s3MPUploader, 
currentUploadInfo, file, future));
+               uploadThreadPool.execute(new UploadTask(s3AccessHelper, 
currentUploadInfo, file, future));
        }
 
        @Override
@@ -124,7 +124,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
 
                return new S3Committer(
-                               s3MPUploader,
+                               s3AccessHelper,
                                snapshot.getObjectName(),
                                snapshot.uploadId(),
                                snapshot.parts(),
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                        // they do not fall under the user's global TTL on S3.
                        // Figure out a way to clean them.
 
-                       
s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, 
file.getPos());
+                       
s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, 
file.getPos());
                }
                finally {
                        file.release();
@@ -244,14 +244,14 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
        // 
------------------------------------------------------------------------
 
        public static RecoverableMultiPartUploadImpl newUpload(
-                       final S3MultiPartUploader s3uploader,
+                       final S3AccessHelper s3AccessHelper,
                        final Executor uploadThreadPool,
                        final String objectName) throws IOException {
 
-               final String multiPartUploadId = 
s3uploader.startMultiPartUpload(objectName);
+               final String multiPartUploadId = 
s3AccessHelper.startMultiPartUpload(objectName);
 
                return new RecoverableMultiPartUploadImpl(
-                               s3uploader,
+                               s3AccessHelper,
                                uploadThreadPool,
                                multiPartUploadId,
                                objectName,
@@ -261,7 +261,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
        }
 
        public static RecoverableMultiPartUploadImpl recoverUpload(
-                       final S3MultiPartUploader s3uploader,
+                       final S3AccessHelper s3AccessHelper,
                        final Executor uploadThreadPool,
                        final String multipartUploadId,
                        final String objectName,
@@ -270,7 +270,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                        final Optional<File> incompletePart) {
 
                return new RecoverableMultiPartUploadImpl(
-                               s3uploader,
+                               s3AccessHelper,
                                uploadThreadPool,
                                multipartUploadId,
                                objectName,
@@ -286,7 +286,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
 
        private static class UploadTask implements Runnable {
 
-               private final S3MultiPartUploader s3uploader;
+               private final S3AccessHelper s3AccessHelper;
 
                private final String objectName;
 
@@ -299,7 +299,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                private final CompletableFuture<PartETag> future;
 
                UploadTask(
-                               final S3MultiPartUploader s3uploader,
+                               final S3AccessHelper s3AccessHelper,
                                final MultiPartUploadInfo currentUpload,
                                final RefCountedFSOutputStream file,
                                final CompletableFuture<PartETag> future) {
@@ -313,7 +313,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                        // these are limits put by Amazon
                        checkArgument(partNumber >= 1  && partNumber <= 10_000);
 
-                       this.s3uploader = checkNotNull(s3uploader);
+                       this.s3AccessHelper = checkNotNull(s3AccessHelper);
                        this.file = checkNotNull(file);
                        this.future = checkNotNull(future);
                }
@@ -321,7 +321,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                @Override
                public void run() {
                        try (final InputStream inputStream = 
file.getInputStream()) {
-                               final UploadPartResult result = 
s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, 
file.getPos());
+                               final UploadPartResult result = 
s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, 
file.getPos());
                                future.complete(new 
PartETag(result.getPartNumber(), result.getETag()));
                                file.release();
                        }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
similarity index 99%
rename from 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
rename to 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index da227a4..57920a5 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * the upload with all its parts will be either committed or discarded.
  */
 @Internal
-public interface S3MultiPartUploader {
+public interface S3AccessHelper {
 
        /**
         * Initializes a Multi-Part Upload.
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
index 1fc8bf1..5fbc5bb 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -40,7 +40,7 @@ public final class S3Committer implements 
RecoverableFsDataOutputStream.Committe
 
        private static final Logger LOG = 
LoggerFactory.getLogger(S3Committer.class);
 
-       private final S3MultiPartUploader s3uploader;
+       private final S3AccessHelper s3AccessHelper;
 
        private final String uploadId;
 
@@ -50,8 +50,8 @@ public final class S3Committer implements 
RecoverableFsDataOutputStream.Committe
 
        private final long totalLength;
 
-       S3Committer(S3MultiPartUploader s3uploader, String objectName, String 
uploadId, List<PartETag> parts, long totalLength) {
-               this.s3uploader = checkNotNull(s3uploader);
+       S3Committer(S3AccessHelper s3AccessHelper, String objectName, String 
uploadId, List<PartETag> parts, long totalLength) {
+               this.s3AccessHelper = checkNotNull(s3AccessHelper);
                this.objectName = checkNotNull(objectName);
                this.uploadId = checkNotNull(uploadId);
                this.parts = checkNotNull(parts);
@@ -64,7 +64,7 @@ public final class S3Committer implements 
RecoverableFsDataOutputStream.Committe
                        LOG.info("Committing {} with MPU ID {}", objectName, 
uploadId);
 
                        final AtomicInteger errorCount = new AtomicInteger();
-                       s3uploader.commitMultiPartUpload(objectName, uploadId, 
parts, totalLength, errorCount);
+                       s3AccessHelper.commitMultiPartUpload(objectName, 
uploadId, parts, totalLength, errorCount);
 
                        if (errorCount.get() == 0) {
                                LOG.debug("Successfully committed {} with MPU 
ID {}", objectName, uploadId);
@@ -82,14 +82,14 @@ public final class S3Committer implements 
RecoverableFsDataOutputStream.Committe
                        LOG.info("Trying to commit after recovery {} with MPU 
ID {}", objectName, uploadId);
 
                        try {
-                               s3uploader.commitMultiPartUpload(objectName, 
uploadId, parts, totalLength, new AtomicInteger());
+                               
s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, 
new AtomicInteger());
                        } catch (IOException e) {
                                LOG.info("Failed to commit after recovery {} 
with MPU ID {}. " +
                                                "Checking if file was committed 
before...", objectName, uploadId);
                                LOG.trace("Exception when committing:", e);
 
                                try {
-                                       ObjectMetadata metadata = 
s3uploader.getObjectMetadata(objectName);
+                                       ObjectMetadata metadata = 
s3AccessHelper.getObjectMetadata(objectName);
                                        if (totalLength != 
metadata.getContentLength()) {
                                                String message = 
String.format("Inconsistent result for object %s: conflicting lengths. " +
                                                                                
"Recovered committer for upload %s indicates %s bytes, present object is %s 
bytes",
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index b201981..9a171ae 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
 
        private final org.apache.hadoop.fs.FileSystem fs;
 
-       private final S3MultiPartUploader twoPhaseUploader;
+       private final S3AccessHelper s3AccessHelper;
 
        private final FunctionWithException<File, RefCountedFile, IOException> 
tmpFileSupplier;
 
@@ -53,7 +53,7 @@ final class S3RecoverableMultipartUploadFactory {
 
        S3RecoverableMultipartUploadFactory(
                        final FileSystem fs,
-                       final S3MultiPartUploader twoPhaseUploader,
+                       final S3AccessHelper s3AccessHelper,
                        final int maxConcurrentUploadsPerStream,
                        final Executor executor,
                        final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileSupplier) {
@@ -61,14 +61,14 @@ final class S3RecoverableMultipartUploadFactory {
                this.fs = Preconditions.checkNotNull(fs);
                this.maxConcurrentUploadsPerStream = 
maxConcurrentUploadsPerStream;
                this.executor = executor;
-               this.twoPhaseUploader = twoPhaseUploader;
+               this.s3AccessHelper = s3AccessHelper;
                this.tmpFileSupplier = tmpFileSupplier;
        }
 
        RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws 
IOException {
 
                return RecoverableMultiPartUploadImpl.newUpload(
-                               twoPhaseUploader,
+                               s3AccessHelper,
                                limitedExecutor(),
                                pathToObjectName(path));
        }
@@ -77,7 +77,7 @@ final class S3RecoverableMultipartUploadFactory {
                final Optional<File> incompletePart = 
downloadLastDataChunk(recoverable);
 
                return RecoverableMultiPartUploadImpl.recoverUpload(
-                               twoPhaseUploader,
+                               s3AccessHelper,
                                limitedExecutor(),
                                recoverable.uploadId(),
                                recoverable.getObjectName(),
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a84308..698f65f 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -129,7 +129,7 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
        public static S3RecoverableWriter writer(
                        final FileSystem fs,
                        final FunctionWithException<File, RefCountedFile, 
IOException> tempFileCreator,
-                       final S3MultiPartUploader twoPhaseUploader,
+                       final S3AccessHelper s3AccessHelper,
                        final Executor uploadThreadPool,
                        final long userDefinedMinPartSize,
                        final int maxConcurrentUploadsPerStream) {
@@ -139,7 +139,7 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
                final S3RecoverableMultipartUploadFactory uploadFactory =
                                new S3RecoverableMultipartUploadFactory(
                                                fs,
-                                               twoPhaseUploader,
+                                               s3AccessHelper,
                                                maxConcurrentUploadsPerStream,
                                                uploadThreadPool,
                                                tempFileCreator);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index d3d25c3..5b15652 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -78,7 +78,7 @@ public class S3EntropyFsFactoryTest extends TestLogger {
 
                @Nullable
                @Override
-               protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+               protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
                        return null;
                }
 
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 72554e1..4c2f147 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -339,10 +339,10 @@ public class RecoverableMultiPartUploadImplTest {
        }
 
        /**
-        * A {@link S3MultiPartUploader} that simulates uploading part files to 
S3 by
+        * A {@link S3AccessHelper} that simulates uploading part files to S3 by
         * simply putting complete and incomplete part files in lists for 
further validation.
         */
-       private static class StubMultiPartUploader implements 
S3MultiPartUploader {
+       private static class StubMultiPartUploader implements S3AccessHelper {
 
                private final 
List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> 
completePartsUploaded = new ArrayList<>();
                private final 
List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> 
incompletePartsUploaded = new ArrayList<>();
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
similarity index 77%
rename from 
flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
rename to 
flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f446f70..f833471 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.fs.s3hadoop;
 
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.MathUtils;
 
 import com.amazonaws.SdkBaseException;
@@ -43,16 +43,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A 
filesystem.
+ * An implementation of the {@link S3AccessHelper} for the Hadoop S3A 
filesystem.
  */
-public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+public class HadoopS3AccessHelper implements S3AccessHelper {
 
        private final S3AFileSystem s3a;
 
-       private final InternalWriteOperationHelper s3uploader;
+       private final InternalWriteOperationHelper s3accessHelper;
 
-       public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) 
{
-               this.s3uploader = new InternalWriteOperationHelper(
+       public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
+               this.s3accessHelper = new InternalWriteOperationHelper(
                                checkNotNull(s3a),
                                checkNotNull(conf)
                );
@@ -61,25 +61,25 @@ public class HadoopS3MultiPartUploader implements 
S3MultiPartUploader {
 
        @Override
        public String startMultiPartUpload(String key) throws IOException {
-               return s3uploader.initiateMultiPartUpload(key);
+               return s3accessHelper.initiateMultiPartUpload(key);
        }
 
        @Override
        public UploadPartResult uploadPart(String key, String uploadId, int 
partNumber, InputStream inputStream, long length) throws IOException {
-               final UploadPartRequest uploadRequest = 
s3uploader.newUploadPartRequest(
+               final UploadPartRequest uploadRequest = 
s3accessHelper.newUploadPartRequest(
                                key, uploadId, partNumber, 
MathUtils.checkedDownCast(length), inputStream, null, 0L);
-               return s3uploader.uploadPart(uploadRequest);
+               return s3accessHelper.uploadPart(uploadRequest);
        }
 
        @Override
        public PutObjectResult uploadIncompletePart(String key, InputStream 
inputStream, long length) throws IOException {
-               final PutObjectRequest putRequest = 
s3uploader.createPutObjectRequest(key, inputStream, length);
-               return s3uploader.putObject(putRequest);
+               final PutObjectRequest putRequest = 
s3accessHelper.createPutObjectRequest(key, inputStream, length);
+               return s3accessHelper.putObject(putRequest);
        }
 
        @Override
        public CompleteMultipartUploadResult commitMultiPartUpload(String 
destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger 
errorCount) throws IOException {
-               return s3uploader.completeMPUwithRetries(destKey, uploadId, 
partETags, length, errorCount);
+               return s3accessHelper.completeMPUwithRetries(destKey, uploadId, 
partETags, length, errorCount);
        }
 
        @Override
@@ -94,7 +94,7 @@ public class HadoopS3MultiPartUploader implements 
S3MultiPartUploader {
 
        /**
         * Internal {@link WriteOperationHelper} that is wrapped so that it 
only exposes
-        * the functionality we need for the {@link S3MultiPartUploader}.
+        * the functionality we need for the {@link S3AccessHelper}.
         */
        private static final class InternalWriteOperationHelper extends 
WriteOperationHelper {
 
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 897629f..2637e7b 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3hadoop;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -96,8 +96,8 @@ public class S3FileSystemFactory extends 
AbstractS3FileSystemFactory {
 
        @Nullable
        @Override
-       protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+       protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
                final S3AFileSystem s3Afs = (S3AFileSystem) fs;
-               return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+               return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf());
        }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index b579d6e..0fb2857 100644
--- 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -92,7 +92,7 @@ public class S3FileSystemFactory extends 
AbstractS3FileSystemFactory {
 
        @Nullable
        @Override
-       protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+       protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
                return null;
        }
 

Reply via email to