This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4e323c33a9f25673587ee0e8f4f9786b21db666c Author: Roman Khachatryan <[email protected]> AuthorDate: Thu May 7 15:30:56 2020 +0200 [FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from RefCountedFile Motivation: use RefCountedFile for reading as well. --- .../flink/fs/s3/common/FlinkS3FileSystem.java | 4 +- .../utils/RefCountedBufferingFileStream.java | 10 ++-- .../flink/fs/s3/common/utils/RefCountedFile.java | 59 ++----------------- ...ntedFile.java => RefCountedFileWithStream.java} | 68 ++++------------------ .../s3/common/utils/RefCountedTmpFileCreator.java | 10 ++-- .../writer/S3RecoverableFsDataOutputStream.java | 12 ++-- .../S3RecoverableMultipartUploadFactory.java | 8 +-- .../fs/s3/common/writer/S3RecoverableWriter.java | 8 +-- .../utils/RefCountedBufferingFileStreamTest.java | 4 +- .../fs/s3/common/utils/RefCountedFileTest.java | 59 ++----------------- ...Test.java => RefCountedFileWithStreamTest.java} | 60 +++---------------- .../writer/RecoverableMultiPartUploadImplTest.java | 4 +- .../S3RecoverableFsDataOutputStreamTest.java | 10 ++-- 13 files changed, 63 insertions(+), 253 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index 5248e06..3514bbcb 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common; import org.apache.flink.core.fs.EntropyInjectingFileSystem; import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter; @@ -57,7 +57,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject private final String localTmpDir; - private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator; + private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator; @Nullable private final S3AccessHelper s3AccessHelper; diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java index 29f2590..5f149df 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java @@ -29,7 +29,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link RefCountedFile} that also uses an in-memory buffer for buffering small writes. + * A {@link RefCountedFileWithStream} that also uses an in-memory buffer for buffering small writes. * This is done to avoid frequent 'flushes' of the file stream to disk. */ @Internal @@ -37,7 +37,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { public static final int BUFFER_SIZE = 4096; - private final RefCountedFile currentTmpFile; + private final RefCountedFileWithStream currentTmpFile; /** The write buffer. */ private final byte[] buffer; @@ -49,7 +49,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { @VisibleForTesting public RefCountedBufferingFileStream( - final RefCountedFile file, + final RefCountedFileWithStream file, final int bufferSize) { checkArgument(bufferSize > 0L); @@ -165,7 +165,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { // ------------------------- Factory Methods ------------------------- public static RefCountedBufferingFileStream openNew( - final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider) throws IOException { + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider) throws IOException { return new RefCountedBufferingFileStream( tmpFileProvider.apply(null), @@ -173,7 +173,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { } public static RefCountedBufferingFileStream restore( - final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider, final File initialTmpFile) throws IOException { return new RefCountedBufferingFileStream( diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java index 1787636..9675f09 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java @@ -21,11 +21,10 @@ package org.apache.flink.fs.s3.common.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.IOUtils; +import org.apache.flink.util.RefCounted; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.file.Files; import java.util.concurrent.atomic.AtomicInteger; @@ -40,21 +39,13 @@ public class RefCountedFile implements RefCounted { private final File file; - private final OffsetAwareOutputStream stream; - private final AtomicInteger references; - private boolean closed; + protected boolean closed; - private RefCountedFile( - final File file, - final OutputStream currentOut, - final long bytesInCurrentPart) { + protected RefCountedFile(final File file) { this.file = checkNotNull(file); this.references = new AtomicInteger(1); - this.stream = new OffsetAwareOutputStream( - currentOut, - bytesInCurrentPart); this.closed = false; } @@ -62,33 +53,6 @@ public class RefCountedFile implements RefCounted { return file; } - public OffsetAwareOutputStream getStream() { - return stream; - } - - public long getLength() { - return stream.getLength(); - } - - public void write(byte[] b, int off, int len) throws IOException { - requireOpened(); - if (len > 0) { - stream.write(b, off, len); - } - } - - public void flush() throws IOException { - requireOpened(); - stream.flush(); - } - - public void closeStream() { - if (!closed) { - IOUtils.closeQuietly(stream); - closed = true; - } - } - @Override public void retain() { references.incrementAndGet(); @@ -119,22 +83,7 @@ public class RefCountedFile implements RefCounted { } @VisibleForTesting - int getReferenceCounter() { + public int getReferenceCounter() { return references.get(); } - - // ------------------------------ Factory methods for initializing a temporary file ------------------------------ - - public static RefCountedFile newFile( - final File file, - final OutputStream currentOut) throws IOException { - return new RefCountedFile(file, currentOut, 0L); - } - - public static RefCountedFile restoredFile( - final File file, - final OutputStream currentOut, - final long bytesInCurrentPart) { - return new RefCountedFile(file, currentOut, bytesInCurrentPart); - } } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java similarity index 60% copy from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java copy to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java index 1787636..94b8527 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java @@ -19,47 +19,27 @@ package org.apache.flink.fs.s3.common.utils; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.nio.file.Files; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * A reference counted file which is deleted as soon as no caller * holds a reference to the wrapped {@link File}. */ @Internal -public class RefCountedFile implements RefCounted { - - private final File file; +public class RefCountedFileWithStream extends RefCountedFile { private final OffsetAwareOutputStream stream; - private final AtomicInteger references; - - private boolean closed; - - private RefCountedFile( + private RefCountedFileWithStream( final File file, final OutputStream currentOut, final long bytesInCurrentPart) { - this.file = checkNotNull(file); - this.references = new AtomicInteger(1); - this.stream = new OffsetAwareOutputStream( - currentOut, - bytesInCurrentPart); - this.closed = false; - } - - public File getFile() { - return file; + super(file); + this.stream = new OffsetAwareOutputStream(currentOut, bytesInCurrentPart); } public OffsetAwareOutputStream getStream() { @@ -77,64 +57,36 @@ public class RefCountedFile implements RefCounted { } } - public void flush() throws IOException { + void flush() throws IOException { requireOpened(); stream.flush(); } - public void closeStream() { + void closeStream() { if (!closed) { IOUtils.closeQuietly(stream); closed = true; } } - @Override - public void retain() { - references.incrementAndGet(); - } - - @Override - public boolean release() { - if (references.decrementAndGet() == 0) { - return tryClose(); - } - return false; - } - - private boolean tryClose() { - try { - Files.deleteIfExists(file.toPath()); - return true; - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalError(t); - } - return false; - } - private void requireOpened() throws IOException { if (closed) { throw new IOException("Stream closed."); } } - @VisibleForTesting - int getReferenceCounter() { - return references.get(); - } - // ------------------------------ Factory methods for initializing a temporary file ------------------------------ - public static RefCountedFile newFile( + public static RefCountedFileWithStream newFile( final File file, final OutputStream currentOut) throws IOException { - return new RefCountedFile(file, currentOut, 0L); + return new RefCountedFileWithStream(file, currentOut, 0L); } - public static RefCountedFile restoredFile( + public static RefCountedFileWithStream restoredFile( final File file, final OutputStream currentOut, final long bytesInCurrentPart) { - return new RefCountedFile(file, currentOut, bytesInCurrentPart); + return new RefCountedFileWithStream(file, currentOut, bytesInCurrentPart); } } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java index 7a928d0..51b417c 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java @@ -34,10 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; /** - * A utility class that creates local {@link RefCountedFile reference counted files} that serve as temporary files. + * A utility class that creates local {@link RefCountedFileWithStream reference counted files} that serve as temporary files. */ @Internal -public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFile, IOException> { +public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFileWithStream, IOException> { private final File[] tempDirectories; @@ -70,7 +70,7 @@ public class RefCountedTmpFileCreator implements FunctionWithException<File, Ref * @throws IOException Thrown, if the stream to the temp file could not be opened. */ @Override - public RefCountedFile apply(File file) throws IOException { + public RefCountedFileWithStream apply(File file) throws IOException { final File directory = tempDirectories[nextIndex()]; while (true) { @@ -78,10 +78,10 @@ public class RefCountedTmpFileCreator implements FunctionWithException<File, Ref if (file == null) { final File newFile = new File(directory, ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - return RefCountedFile.newFile(newFile, out); + return RefCountedFileWithStream.newFile(newFile, out); } else { final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND); - return RefCountedFile.restoredFile(file, out, file.length()); + return RefCountedFileWithStream.restoredFile(file, out, file.length()); } } catch (FileAlreadyExistsException ignored) { // fall through the loop and retry diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java index 220ddd5..5447026 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java @@ -23,7 +23,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.function.FunctionWithException; import org.apache.commons.io.IOUtils; @@ -60,7 +60,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp private final RecoverableMultiPartUpload upload; - private final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider; + private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider; /** * The number of bytes at which we start a new part of the multipart upload. @@ -80,7 +80,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp */ S3RecoverableFsDataOutputStream( RecoverableMultiPartUpload upload, - FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, + FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator, RefCountedFSOutputStream initialTmpFile, long userDefinedMinPartSize, long bytesBeforeCurrentPart) { @@ -228,7 +228,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp public static S3RecoverableFsDataOutputStream newStream( final RecoverableMultiPartUpload upload, - final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator, final long userDefinedMinPartSize) throws IOException { checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE); @@ -245,7 +245,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp public static S3RecoverableFsDataOutputStream recoverStream( final RecoverableMultiPartUpload upload, - final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator, final long userDefinedMinPartSize, final long bytesBeforeCurrentPart) throws IOException { @@ -264,7 +264,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp } private static RefCountedBufferingFileStream boundedBufferingFileStream( - final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator, final Optional<File> incompletePart) throws IOException { if (!incompletePart.isPresent()) { diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java index 3727e25..b7fb8fb 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java @@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory { private final S3AccessHelper s3AccessHelper; - private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier; + private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier; private final int maxConcurrentUploadsPerStream; @@ -54,7 +54,7 @@ final class S3RecoverableMultipartUploadFactory { final S3AccessHelper s3AccessHelper, final int maxConcurrentUploadsPerStream, final Executor executor, - final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) { + final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier) { this.fs = Preconditions.checkNotNull(fs); this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; @@ -92,7 +92,7 @@ final class S3RecoverableMultipartUploadFactory { } // download the file (simple way) - final RefCountedFile refCountedFile = tmpFileSupplier.apply(null); + final RefCountedFileWithStream refCountedFile = tmpFileSupplier.apply(null); final File file = refCountedFile.getFile(); final long numBytes = s3AccessHelper.getObject(objectKey, file); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java index ddb4443..a6b62cc 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java @@ -25,7 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.function.FunctionWithException; import org.apache.hadoop.fs.FileSystem; @@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @PublicEvolving public class S3RecoverableWriter implements RecoverableWriter { - private final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator; + private final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator; private final long userDefinedMinPartSize; @@ -62,7 +62,7 @@ public class S3RecoverableWriter implements RecoverableWriter { S3RecoverableWriter( final S3AccessHelper s3AccessHelper, final S3RecoverableMultipartUploadFactory uploadFactory, - final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator, final long userDefinedMinPartSize) { this.s3AccessHelper = checkNotNull(s3AccessHelper); @@ -144,7 +144,7 @@ public class S3RecoverableWriter implements RecoverableWriter { public static S3RecoverableWriter writer( final FileSystem fs, - final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, + final FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator, final S3AccessHelper s3AccessHelper, final Executor uploadThreadPool, final long userDefinedMinPartSize, diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java index 50ea9bd..368c9cf 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java @@ -134,11 +134,11 @@ public class RefCountedBufferingFileStreamTest { return new RefCountedBufferingFileStream(getRefCountedFileWithContent(), BUFFER_SIZE); } - private RefCountedFile getRefCountedFileWithContent() throws IOException { + private RefCountedFileWithStream getRefCountedFileWithContent() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - return RefCountedFile.newFile(newFile, out); + return RefCountedFileWithStream.newFile(newFile, out); } private static byte[] bytesOf(String str) { diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java index 2e03197..217f4e1 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java @@ -25,14 +25,14 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.UUID; import java.util.stream.Stream; +import static org.apache.flink.util.Preconditions.checkState; + /** * Tests for the {@link RefCountedFile}. */ @@ -44,9 +44,9 @@ public class RefCountedFileTest { @Test public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); + checkState(newFile.createNewFile()); - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); + RefCountedFile fileUnderTest = new RefCountedFile(newFile); verifyTheFileIsStillThere(); fileUnderTest.release(); @@ -59,10 +59,10 @@ public class RefCountedFileTest { @Test public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); + checkState(newFile.createNewFile()); // the reference counter always starts with 1 (not 0). This is why we need +1 releases - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); + RefCountedFile fileUnderTest = new RefCountedFile(newFile); verifyTheFileIsStillThere(); fileUnderTest.retain(); @@ -85,59 +85,12 @@ public class RefCountedFileTest { } } - @Test - public void writeShouldSucceed() throws IOException { - byte[] content = bytesOf("hello world"); - - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content); - long fileLength = fileUnderTest.getLength(); - - Assert.assertEquals(content.length, fileLength); - } - - @Test - public void closeShouldNotReleaseReference() throws IOException { - getClosedRefCountedFileWithContent("hello world"); - verifyTheFileIsStillThere(); - } - - @Test(expected = IOException.class) - public void writeAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); - byte[] content = bytesOf("Hello Again"); - fileUnderTest.write(content, 0, content.length); - } - - @Test(expected = IOException.class) - public void flushAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); - fileUnderTest.flush(); - } - - // ------------------------------------- Utilities ------------------------------------- - private void verifyTheFileIsStillThere() throws IOException { try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) { Assert.assertEquals(1L, files.count()); } } - private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException { - return getClosedRefCountedFileWithContent(bytesOf(content)); - } - - private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException { - final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - - final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); - - fileUnderTest.write(content, 0, content.length); - - fileUnderTest.closeStream(); - return fileUnderTest; - } - private static byte[] bytesOf(String str) { return str.getBytes(StandardCharsets.UTF_8); } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java similarity index 56% copy from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java copy to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java index 2e03197..7aa7240 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java @@ -34,62 +34,18 @@ import java.util.UUID; import java.util.stream.Stream; /** - * Tests for the {@link RefCountedFile}. + * Tests for the {@link RefCountedFileWithStream}. */ -public class RefCountedFileTest { +public class RefCountedFileWithStreamTest { @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException { - final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); - verifyTheFileIsStillThere(); - - fileUnderTest.release(); - - try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) { - Assert.assertEquals(0L, files.count()); - } - } - - @Test - public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException { - final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - - // the reference counter always starts with 1 (not 0). This is why we need +1 releases - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); - verifyTheFileIsStillThere(); - - fileUnderTest.retain(); - fileUnderTest.retain(); - - Assert.assertEquals(3, fileUnderTest.getReferenceCounter()); - - fileUnderTest.release(); - Assert.assertEquals(2, fileUnderTest.getReferenceCounter()); - verifyTheFileIsStillThere(); - - fileUnderTest.release(); - Assert.assertEquals(1, fileUnderTest.getReferenceCounter()); - verifyTheFileIsStillThere(); - - fileUnderTest.release(); - // the file is deleted now - try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) { - Assert.assertEquals(0L, files.count()); - } - } - - @Test public void writeShouldSucceed() throws IOException { byte[] content = bytesOf("hello world"); - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content); + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent(content); long fileLength = fileUnderTest.getLength(); Assert.assertEquals(content.length, fileLength); @@ -103,14 +59,14 @@ public class RefCountedFileTest { @Test(expected = IOException.class) public void writeAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world"); byte[] content = bytesOf("Hello Again"); fileUnderTest.write(content, 0, content.length); } @Test(expected = IOException.class) public void flushAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world"); fileUnderTest.flush(); } @@ -122,15 +78,15 @@ public class RefCountedFileTest { } } - private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException { + private RefCountedFileWithStream getClosedRefCountedFileWithContent(String content) throws IOException { return getClosedRefCountedFileWithContent(bytesOf(content)); } - private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException { + private RefCountedFileWithStream getClosedRefCountedFileWithContent(byte[] content) throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); + final RefCountedFileWithStream fileUnderTest = RefCountedFileWithStream.newFile(newFile, out); fileUnderTest.write(content, 0, content.length); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index f01da88..e8c6e9e 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -19,7 +19,7 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.IOUtils; import org.apache.flink.util.MathUtils; @@ -320,7 +320,7 @@ public class RecoverableMultiPartUploadImplTest { final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); final RefCountedBufferingFileStream testStream = - new RefCountedBufferingFileStream(RefCountedFile.newFile(newFile, out), BUFFER_SIZE); + new RefCountedBufferingFileStream(RefCountedFileWithStream.newFile(newFile, out), BUFFER_SIZE); testStream.write(content, 0, content.length); return testStream; diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java index 14ed2e2..b7c94c4 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java @@ -22,7 +22,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -483,7 +483,7 @@ public class S3RecoverableFsDataOutputStreamTest { } } - private static class TestFileProvider implements FunctionWithException<File, RefCountedFile, IOException> { + private static class TestFileProvider implements FunctionWithException<File, RefCountedFileWithStream, IOException> { private final TemporaryFolder folder; @@ -492,16 +492,16 @@ public class S3RecoverableFsDataOutputStreamTest { } @Override - public RefCountedFile apply(@Nullable File file) throws IOException { + public RefCountedFileWithStream apply(@Nullable File file) throws IOException { while (true) { try { if (file == null) { final File newFile = new File(folder.getRoot(), ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - return RefCountedFile.newFile(newFile, out); + return RefCountedFileWithStream.newFile(newFile, out); } else { final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND); - return RefCountedFile.restoredFile(file, out, file.length()); + return RefCountedFileWithStream.restoredFile(file, out, file.length()); } } catch (FileAlreadyExistsException e) { // fall through the loop and retry
