This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3514c52d7539dedac30df2d42fdc05b79f0e4551 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Nov 21 11:11:10 2018 +0100 [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper. --- .../fs/s3/common/AbstractS3FileSystemFactory.java | 6 ++--- .../flink/fs/s3/common/FlinkS3FileSystem.java | 12 ++++----- .../writer/RecoverableMultiPartUploadImpl.java | 30 +++++++++++----------- ...3MultiPartUploader.java => S3AccessHelper.java} | 2 +- .../flink/fs/s3/common/writer/S3Committer.java | 12 ++++----- .../S3RecoverableMultipartUploadFactory.java | 10 ++++---- .../fs/s3/common/writer/S3RecoverableWriter.java | 4 +-- .../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 4 +-- .../writer/RecoverableMultiPartUploadImplTest.java | 4 +-- ...PartUploader.java => HadoopS3AccessHelper.java} | 26 +++++++++---------- .../flink/fs/s3hadoop/S3FileSystemFactory.java | 6 ++--- .../flink/fs/s3presto/S3FileSystemFactory.java | 4 +-- 12 files changed, 60 insertions(+), 60 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java index 318fd39..6ccdeae 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +141,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory { final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS); final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE); final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS); - final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs); + final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs); return new FlinkS3FileSystem( fs, @@ -166,6 +166,6 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory { URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig); @Nullable - protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs); + protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs); } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index 553edde..5248e06 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.fs.s3.common.utils.RefCountedFile; import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; @@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator; @Nullable - private final S3MultiPartUploader s3UploadHelper; + private final S3AccessHelper s3AccessHelper; private final Executor uploadThreadPool; @@ -83,7 +83,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject String localTmpDirectory, @Nullable String entropyInjectionKey, int entropyLength, - @Nullable S3MultiPartUploader s3UploadHelper, + @Nullable S3AccessHelper s3UploadHelper, long s3uploadPartSize, int maxConcurrentUploadsPerStream) { @@ -99,7 +99,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject // recoverable writer parameter configuration initialization this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory); this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory)); - this.s3UploadHelper = s3UploadHelper; + this.s3AccessHelper = s3UploadHelper; this.uploadThreadPool = Executors.newCachedThreadPool(); Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE); @@ -131,7 +131,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject @Override public RecoverableWriter createRecoverableWriter() throws IOException { - if (s3UploadHelper == null) { + if (s3AccessHelper == null) { // this is the case for Presto throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers."); } @@ -139,7 +139,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject return S3RecoverableWriter.writer( getHadoopFileSystem(), tmpFileCreator, - s3UploadHelper, + s3AccessHelper, uploadThreadPool, s3uploadPartSize, maxConcurrentUploadsPerStream); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java index 80042ce..fe2a4cd 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java @@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkState; @NotThreadSafe final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload { - private final S3MultiPartUploader s3MPUploader; + private final S3AccessHelper s3AccessHelper; private final Executor uploadThreadPool; @@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // ------------------------------------------------------------------------ private RecoverableMultiPartUploadImpl( - S3MultiPartUploader s3uploader, + S3AccessHelper s3AccessHelper, Executor uploadThreadPool, String uploadId, String objectName, @@ -81,7 +81,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload ) { checkArgument(numBytes >= 0L); - this.s3MPUploader = checkNotNull(s3uploader); + this.s3AccessHelper = checkNotNull(s3AccessHelper); this.uploadThreadPool = checkNotNull(uploadThreadPool); this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart); this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName); @@ -111,7 +111,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload currentUploadInfo.registerNewPart(partLength); file.retain(); // keep the file while the async upload still runs - uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future)); + uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future)); } @Override @@ -124,7 +124,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload final S3Recoverable snapshot = snapshotAndGetRecoverable(null); return new S3Committer( - s3MPUploader, + s3AccessHelper, snapshot.getObjectName(), snapshot.uploadId(), snapshot.parts(), @@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // they do not fall under the user's global TTL on S3. // Figure out a way to clean them. - s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos()); + s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos()); } finally { file.release(); @@ -244,14 +244,14 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // ------------------------------------------------------------------------ public static RecoverableMultiPartUploadImpl newUpload( - final S3MultiPartUploader s3uploader, + final S3AccessHelper s3AccessHelper, final Executor uploadThreadPool, final String objectName) throws IOException { - final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName); + final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName); return new RecoverableMultiPartUploadImpl( - s3uploader, + s3AccessHelper, uploadThreadPool, multiPartUploadId, objectName, @@ -261,7 +261,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload } public static RecoverableMultiPartUploadImpl recoverUpload( - final S3MultiPartUploader s3uploader, + final S3AccessHelper s3AccessHelper, final Executor uploadThreadPool, final String multipartUploadId, final String objectName, @@ -270,7 +270,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload final Optional<File> incompletePart) { return new RecoverableMultiPartUploadImpl( - s3uploader, + s3AccessHelper, uploadThreadPool, multipartUploadId, objectName, @@ -286,7 +286,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload private static class UploadTask implements Runnable { - private final S3MultiPartUploader s3uploader; + private final S3AccessHelper s3AccessHelper; private final String objectName; @@ -299,7 +299,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload private final CompletableFuture<PartETag> future; UploadTask( - final S3MultiPartUploader s3uploader, + final S3AccessHelper s3AccessHelper, final MultiPartUploadInfo currentUpload, final RefCountedFSOutputStream file, final CompletableFuture<PartETag> future) { @@ -313,7 +313,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // these are limits put by Amazon checkArgument(partNumber >= 1 && partNumber <= 10_000); - this.s3uploader = checkNotNull(s3uploader); + this.s3AccessHelper = checkNotNull(s3AccessHelper); this.file = checkNotNull(file); this.future = checkNotNull(future); } @@ -321,7 +321,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload @Override public void run() { try (final InputStream inputStream = file.getInputStream()) { - final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos()); + final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos()); future.complete(new PartETag(result.getPartNumber(), result.getETag())); file.release(); } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java similarity index 99% rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java index da227a4..57920a5 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; * the upload with all its parts will be either committed or discarded. */ @Internal -public interface S3MultiPartUploader { +public interface S3AccessHelper { /** * Initializes a Multi-Part Upload. diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java index 1fc8bf1..5fbc5bb 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java @@ -40,7 +40,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class); - private final S3MultiPartUploader s3uploader; + private final S3AccessHelper s3AccessHelper; private final String uploadId; @@ -50,8 +50,8 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe private final long totalLength; - S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) { - this.s3uploader = checkNotNull(s3uploader); + S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) { + this.s3AccessHelper = checkNotNull(s3AccessHelper); this.objectName = checkNotNull(objectName); this.uploadId = checkNotNull(uploadId); this.parts = checkNotNull(parts); @@ -64,7 +64,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe LOG.info("Committing {} with MPU ID {}", objectName, uploadId); final AtomicInteger errorCount = new AtomicInteger(); - s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount); + s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount); if (errorCount.get() == 0) { LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId); @@ -82,14 +82,14 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId); try { - s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger()); + s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger()); } catch (IOException e) { LOG.info("Failed to commit after recovery {} with MPU ID {}. " + "Checking if file was committed before...", objectName, uploadId); LOG.trace("Exception when committing:", e); try { - ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName); + ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName); if (totalLength != metadata.getContentLength()) { String message = String.format("Inconsistent result for object %s: conflicting lengths. " + "Recovered committer for upload %s indicates %s bytes, present object is %s bytes", diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java index b201981..9a171ae 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java @@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory { private final org.apache.hadoop.fs.FileSystem fs; - private final S3MultiPartUploader twoPhaseUploader; + private final S3AccessHelper s3AccessHelper; private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier; @@ -53,7 +53,7 @@ final class S3RecoverableMultipartUploadFactory { S3RecoverableMultipartUploadFactory( final FileSystem fs, - final S3MultiPartUploader twoPhaseUploader, + final S3AccessHelper s3AccessHelper, final int maxConcurrentUploadsPerStream, final Executor executor, final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) { @@ -61,14 +61,14 @@ final class S3RecoverableMultipartUploadFactory { this.fs = Preconditions.checkNotNull(fs); this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; this.executor = executor; - this.twoPhaseUploader = twoPhaseUploader; + this.s3AccessHelper = s3AccessHelper; this.tmpFileSupplier = tmpFileSupplier; } RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException { return RecoverableMultiPartUploadImpl.newUpload( - twoPhaseUploader, + s3AccessHelper, limitedExecutor(), pathToObjectName(path)); } @@ -77,7 +77,7 @@ final class S3RecoverableMultipartUploadFactory { final Optional<File> incompletePart = downloadLastDataChunk(recoverable); return RecoverableMultiPartUploadImpl.recoverUpload( - twoPhaseUploader, + s3AccessHelper, limitedExecutor(), recoverable.uploadId(), recoverable.getObjectName(), diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java index 2a84308..698f65f 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java @@ -129,7 +129,7 @@ public class S3RecoverableWriter implements RecoverableWriter { public static S3RecoverableWriter writer( final FileSystem fs, final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, - final S3MultiPartUploader twoPhaseUploader, + final S3AccessHelper s3AccessHelper, final Executor uploadThreadPool, final long userDefinedMinPartSize, final int maxConcurrentUploadsPerStream) { @@ -139,7 +139,7 @@ public class S3RecoverableWriter implements RecoverableWriter { final S3RecoverableMultipartUploadFactory uploadFactory = new S3RecoverableMultipartUploadFactory( fs, - twoPhaseUploader, + s3AccessHelper, maxConcurrentUploadsPerStream, uploadThreadPool, tempFileCreator); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java index d3d25c3..5b15652 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.flink.fs.s3.common; import org.apache.flink.configuration.Configuration; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.util.TestLogger; import org.apache.hadoop.fs.FileSystem; @@ -78,7 +78,7 @@ public class S3EntropyFsFactoryTest extends TestLogger { @Nullable @Override - protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) { + protected S3AccessHelper getS3AccessHelper(FileSystem fs) { return null; } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index 72554e1..4c2f147 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -339,10 +339,10 @@ public class RecoverableMultiPartUploadImplTest { } /** - * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by + * A {@link S3AccessHelper} that simulates uploading part files to S3 by * simply putting complete and incomplete part files in lists for further validation. */ - private static class StubMultiPartUploader implements S3MultiPartUploader { + private static class StubMultiPartUploader implements S3AccessHelper { private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>(); private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>(); diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java similarity index 77% rename from flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java rename to flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java index f446f70..f833471 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java @@ -18,7 +18,7 @@ package org.apache.flink.fs.s3hadoop; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.util.MathUtils; import com.amazonaws.SdkBaseException; @@ -43,16 +43,16 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem. + * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem. */ -public class HadoopS3MultiPartUploader implements S3MultiPartUploader { +public class HadoopS3AccessHelper implements S3AccessHelper { private final S3AFileSystem s3a; - private final InternalWriteOperationHelper s3uploader; + private final InternalWriteOperationHelper s3accessHelper; - public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) { - this.s3uploader = new InternalWriteOperationHelper( + public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) { + this.s3accessHelper = new InternalWriteOperationHelper( checkNotNull(s3a), checkNotNull(conf) ); @@ -61,25 +61,25 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader { @Override public String startMultiPartUpload(String key) throws IOException { - return s3uploader.initiateMultiPartUpload(key); + return s3accessHelper.initiateMultiPartUpload(key); } @Override public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException { - final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest( + final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest( key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L); - return s3uploader.uploadPart(uploadRequest); + return s3accessHelper.uploadPart(uploadRequest); } @Override public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException { - final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length); - return s3uploader.putObject(putRequest); + final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length); + return s3accessHelper.putObject(putRequest); } @Override public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException { - return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount); + return s3accessHelper.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount); } @Override @@ -94,7 +94,7 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader { /** * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes - * the functionality we need for the {@link S3MultiPartUploader}. + * the functionality we need for the {@link S3AccessHelper}. */ private static final class InternalWriteOperationHelper extends WriteOperationHelper { diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java index 897629f..2637e7b 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java @@ -21,7 +21,7 @@ package org.apache.flink.fs.s3hadoop; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory; import org.apache.flink.fs.s3.common.HadoopConfigLoader; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -96,8 +96,8 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory { @Nullable @Override - protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) { + protected S3AccessHelper getS3AccessHelper(FileSystem fs) { final S3AFileSystem s3Afs = (S3AFileSystem) fs; - return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf()); + return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf()); } } diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java index b579d6e..0fb2857 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java @@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory; import org.apache.flink.fs.s3.common.HadoopConfigLoader; -import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader; +import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.util.FlinkRuntimeException; import com.facebook.presto.hive.PrestoS3FileSystem; @@ -92,7 +92,7 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory { @Nullable @Override - protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) { + protected S3AccessHelper getS3AccessHelper(FileSystem fs) { return null; }
