This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e323c33a9f25673587ee0e8f4f9786b21db666c
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu May 7 15:30:56 2020 +0200

    [FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from 
RefCountedFile
    Motivation: use RefCountedFile for reading as well.
---
 .../flink/fs/s3/common/FlinkS3FileSystem.java      |  4 +-
 .../utils/RefCountedBufferingFileStream.java       | 10 ++--
 .../flink/fs/s3/common/utils/RefCountedFile.java   | 59 ++-----------------
 ...ntedFile.java => RefCountedFileWithStream.java} | 68 ++++------------------
 .../s3/common/utils/RefCountedTmpFileCreator.java  | 10 ++--
 .../writer/S3RecoverableFsDataOutputStream.java    | 12 ++--
 .../S3RecoverableMultipartUploadFactory.java       |  8 +--
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  8 +--
 .../utils/RefCountedBufferingFileStreamTest.java   |  4 +-
 .../fs/s3/common/utils/RefCountedFileTest.java     | 59 ++-----------------
 ...Test.java => RefCountedFileWithStreamTest.java} | 60 +++----------------
 .../writer/RecoverableMultiPartUploadImplTest.java |  4 +-
 .../S3RecoverableFsDataOutputStreamTest.java       | 10 ++--
 13 files changed, 63 insertions(+), 253 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 5248e06..3514bbcb 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common;
 import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
@@ -57,7 +57,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem 
implements EntropyInject
 
        private final String localTmpDir;
 
-       private final FunctionWithException<File, RefCountedFile, IOException> 
tmpFileCreator;
+       private final FunctionWithException<File, RefCountedFileWithStream, 
IOException> tmpFileCreator;
 
        @Nullable
        private final S3AccessHelper s3AccessHelper;
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
index 29f2590..5f149df 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
@@ -29,7 +29,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link RefCountedFile} that also uses an in-memory buffer for buffering 
small writes.
+ * A {@link RefCountedFileWithStream} that also uses an in-memory buffer for 
buffering small writes.
  * This is done to avoid frequent 'flushes' of the file stream to disk.
  */
 @Internal
@@ -37,7 +37,7 @@ public class RefCountedBufferingFileStream extends 
RefCountedFSOutputStream {
 
        public static final int BUFFER_SIZE = 4096;
 
-       private final RefCountedFile currentTmpFile;
+       private final RefCountedFileWithStream currentTmpFile;
 
        /** The write buffer. */
        private final byte[] buffer;
@@ -49,7 +49,7 @@ public class RefCountedBufferingFileStream extends 
RefCountedFSOutputStream {
 
        @VisibleForTesting
        public RefCountedBufferingFileStream(
-                       final RefCountedFile file,
+                       final RefCountedFileWithStream file,
                        final int bufferSize) {
 
                checkArgument(bufferSize > 0L);
@@ -165,7 +165,7 @@ public class RefCountedBufferingFileStream extends 
RefCountedFSOutputStream {
        // ------------------------- Factory Methods -------------------------
 
        public static RefCountedBufferingFileStream openNew(
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileProvider) throws IOException {
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileProvider) throws IOException {
 
                return new RefCountedBufferingFileStream(
                                tmpFileProvider.apply(null),
@@ -173,7 +173,7 @@ public class RefCountedBufferingFileStream extends 
RefCountedFSOutputStream {
        }
 
        public static RefCountedBufferingFileStream restore(
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileProvider,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileProvider,
                        final File initialTmpFile) throws IOException {
 
                return new RefCountedBufferingFileStream(
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
index 1787636..9675f09 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
@@ -21,11 +21,10 @@ package org.apache.flink.fs.s3.common.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.RefCounted;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,21 +39,13 @@ public class RefCountedFile implements RefCounted {
 
        private final File file;
 
-       private final OffsetAwareOutputStream stream;
-
        private final AtomicInteger references;
 
-       private boolean closed;
+       protected boolean closed;
 
-       private RefCountedFile(
-                       final File file,
-                       final OutputStream currentOut,
-                       final long bytesInCurrentPart) {
+       protected RefCountedFile(final File file) {
                this.file = checkNotNull(file);
                this.references = new AtomicInteger(1);
-               this.stream = new OffsetAwareOutputStream(
-                               currentOut,
-                               bytesInCurrentPart);
                this.closed = false;
        }
 
@@ -62,33 +53,6 @@ public class RefCountedFile implements RefCounted {
                return file;
        }
 
-       public OffsetAwareOutputStream getStream() {
-               return stream;
-       }
-
-       public long getLength() {
-               return stream.getLength();
-       }
-
-       public void write(byte[] b, int off, int len) throws IOException {
-               requireOpened();
-               if (len > 0) {
-                       stream.write(b, off, len);
-               }
-       }
-
-       public void flush() throws IOException {
-               requireOpened();
-               stream.flush();
-       }
-
-       public void closeStream() {
-               if (!closed) {
-                       IOUtils.closeQuietly(stream);
-                       closed = true;
-               }
-       }
-
        @Override
        public void retain() {
                references.incrementAndGet();
@@ -119,22 +83,7 @@ public class RefCountedFile implements RefCounted {
        }
 
        @VisibleForTesting
-       int getReferenceCounter() {
+       public int getReferenceCounter() {
                return references.get();
        }
-
-       // ------------------------------ Factory methods for initializing a 
temporary file ------------------------------
-
-       public static RefCountedFile newFile(
-                       final File file,
-                       final OutputStream currentOut) throws IOException {
-               return new RefCountedFile(file, currentOut, 0L);
-       }
-
-       public static RefCountedFile restoredFile(
-                       final File file,
-                       final OutputStream currentOut,
-                       final long bytesInCurrentPart) {
-               return new RefCountedFile(file, currentOut, bytesInCurrentPart);
-       }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
similarity index 60%
copy from 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
copy to 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
index 1787636..94b8527 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
@@ -19,47 +19,27 @@
 package org.apache.flink.fs.s3.common.utils;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A reference counted file which is deleted as soon as no caller
  * holds a reference to the wrapped {@link File}.
  */
 @Internal
-public class RefCountedFile implements RefCounted {
-
-       private final File file;
+public class RefCountedFileWithStream extends RefCountedFile {
 
        private final OffsetAwareOutputStream stream;
 
-       private final AtomicInteger references;
-
-       private boolean closed;
-
-       private RefCountedFile(
+       private RefCountedFileWithStream(
                        final File file,
                        final OutputStream currentOut,
                        final long bytesInCurrentPart) {
-               this.file = checkNotNull(file);
-               this.references = new AtomicInteger(1);
-               this.stream = new OffsetAwareOutputStream(
-                               currentOut,
-                               bytesInCurrentPart);
-               this.closed = false;
-       }
-
-       public File getFile() {
-               return file;
+               super(file);
+               this.stream = new OffsetAwareOutputStream(currentOut, 
bytesInCurrentPart);
        }
 
        public OffsetAwareOutputStream getStream() {
@@ -77,64 +57,36 @@ public class RefCountedFile implements RefCounted {
                }
        }
 
-       public void flush() throws IOException {
+       void flush() throws IOException {
                requireOpened();
                stream.flush();
        }
 
-       public void closeStream() {
+       void closeStream() {
                if (!closed) {
                        IOUtils.closeQuietly(stream);
                        closed = true;
                }
        }
 
-       @Override
-       public void retain() {
-               references.incrementAndGet();
-       }
-
-       @Override
-       public boolean release() {
-               if (references.decrementAndGet() == 0) {
-                       return tryClose();
-               }
-               return false;
-       }
-
-       private boolean tryClose() {
-               try {
-                       Files.deleteIfExists(file.toPath());
-                       return true;
-               } catch (Throwable t) {
-                       ExceptionUtils.rethrowIfFatalError(t);
-               }
-               return false;
-       }
-
        private void requireOpened() throws IOException {
                if (closed) {
                        throw new IOException("Stream closed.");
                }
        }
 
-       @VisibleForTesting
-       int getReferenceCounter() {
-               return references.get();
-       }
-
        // ------------------------------ Factory methods for initializing a 
temporary file ------------------------------
 
-       public static RefCountedFile newFile(
+       public static RefCountedFileWithStream newFile(
                        final File file,
                        final OutputStream currentOut) throws IOException {
-               return new RefCountedFile(file, currentOut, 0L);
+               return new RefCountedFileWithStream(file, currentOut, 0L);
        }
 
-       public static RefCountedFile restoredFile(
+       public static RefCountedFileWithStream restoredFile(
                        final File file,
                        final OutputStream currentOut,
                        final long bytesInCurrentPart) {
-               return new RefCountedFile(file, currentOut, bytesInCurrentPart);
+               return new RefCountedFileWithStream(file, currentOut, 
bytesInCurrentPart);
        }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
index 7a928d0..51b417c 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
@@ -34,10 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A utility class that creates local {@link RefCountedFile reference counted 
files} that serve as temporary files.
+ * A utility class that creates local {@link RefCountedFileWithStream 
reference counted files} that serve as temporary files.
  */
 @Internal
-public class RefCountedTmpFileCreator implements FunctionWithException<File, 
RefCountedFile, IOException> {
+public class RefCountedTmpFileCreator implements FunctionWithException<File, 
RefCountedFileWithStream, IOException> {
 
        private final File[] tempDirectories;
 
@@ -70,7 +70,7 @@ public class RefCountedTmpFileCreator implements 
FunctionWithException<File, Ref
         * @throws IOException Thrown, if the stream to the temp file could not 
be opened.
         */
        @Override
-       public RefCountedFile apply(File file) throws IOException {
+       public RefCountedFileWithStream apply(File file) throws IOException {
                final File directory = tempDirectories[nextIndex()];
 
                while (true) {
@@ -78,10 +78,10 @@ public class RefCountedTmpFileCreator implements 
FunctionWithException<File, Ref
                                if (file == null) {
                                        final File newFile = new 
File(directory, ".tmp_" + UUID.randomUUID());
                                        final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-                                       return RefCountedFile.newFile(newFile, 
out);
+                                       return 
RefCountedFileWithStream.newFile(newFile, out);
                                } else {
                                        final OutputStream out = 
Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
-                                       return 
RefCountedFile.restoredFile(file, out, file.length());
+                                       return 
RefCountedFileWithStream.restoredFile(file, out, file.length());
                                }
                        } catch (FileAlreadyExistsException ignored) {
                                // fall through the loop and retry
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
index 220ddd5..5447026 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.commons.io.IOUtils;
@@ -60,7 +60,7 @@ public final class S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutp
 
        private final RecoverableMultiPartUpload upload;
 
-       private final FunctionWithException<File, RefCountedFile, IOException> 
tmpFileProvider;
+       private final FunctionWithException<File, RefCountedFileWithStream, 
IOException> tmpFileProvider;
 
        /**
         * The number of bytes at which we start a new part of the multipart 
upload.
@@ -80,7 +80,7 @@ public final class S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutp
         */
        S3RecoverableFsDataOutputStream(
                        RecoverableMultiPartUpload upload,
-                       FunctionWithException<File, RefCountedFile, 
IOException> tempFileCreator,
+                       FunctionWithException<File, RefCountedFileWithStream, 
IOException> tempFileCreator,
                        RefCountedFSOutputStream initialTmpFile,
                        long userDefinedMinPartSize,
                        long bytesBeforeCurrentPart) {
@@ -228,7 +228,7 @@ public final class S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutp
 
        public static S3RecoverableFsDataOutputStream newStream(
                        final RecoverableMultiPartUpload upload,
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileCreator,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileCreator,
                        final long userDefinedMinPartSize) throws IOException {
 
                checkArgument(userDefinedMinPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
@@ -245,7 +245,7 @@ public final class S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutp
 
        public static S3RecoverableFsDataOutputStream recoverStream(
                        final RecoverableMultiPartUpload upload,
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileCreator,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileCreator,
                        final long userDefinedMinPartSize,
                        final long bytesBeforeCurrentPart) throws IOException {
 
@@ -264,7 +264,7 @@ public final class S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutp
        }
 
        private static RefCountedBufferingFileStream boundedBufferingFileStream(
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileCreator,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileCreator,
                        final Optional<File> incompletePart) throws IOException 
{
 
                if (!incompletePart.isPresent()) {
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 3727e25..b7fb8fb 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common.writer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
 
        private final S3AccessHelper s3AccessHelper;
 
-       private final FunctionWithException<File, RefCountedFile, IOException> 
tmpFileSupplier;
+       private final FunctionWithException<File, RefCountedFileWithStream, 
IOException> tmpFileSupplier;
 
        private final int maxConcurrentUploadsPerStream;
 
@@ -54,7 +54,7 @@ final class S3RecoverableMultipartUploadFactory {
                        final S3AccessHelper s3AccessHelper,
                        final int maxConcurrentUploadsPerStream,
                        final Executor executor,
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tmpFileSupplier) {
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tmpFileSupplier) {
 
                this.fs = Preconditions.checkNotNull(fs);
                this.maxConcurrentUploadsPerStream = 
maxConcurrentUploadsPerStream;
@@ -92,7 +92,7 @@ final class S3RecoverableMultipartUploadFactory {
                }
 
                // download the file (simple way)
-               final RefCountedFile refCountedFile = 
tmpFileSupplier.apply(null);
+               final RefCountedFileWithStream refCountedFile = 
tmpFileSupplier.apply(null);
                final File file = refCountedFile.getFile();
                final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index ddb4443..a6b62cc 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -50,7 +50,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class S3RecoverableWriter implements RecoverableWriter {
 
-       private final FunctionWithException<File, RefCountedFile, IOException> 
tempFileCreator;
+       private final FunctionWithException<File, RefCountedFileWithStream, 
IOException> tempFileCreator;
 
        private final long userDefinedMinPartSize;
 
@@ -62,7 +62,7 @@ public class S3RecoverableWriter implements RecoverableWriter 
{
        S3RecoverableWriter(
                        final S3AccessHelper s3AccessHelper,
                        final S3RecoverableMultipartUploadFactory uploadFactory,
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tempFileCreator,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tempFileCreator,
                        final long userDefinedMinPartSize) {
 
                this.s3AccessHelper = checkNotNull(s3AccessHelper);
@@ -144,7 +144,7 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
 
        public static S3RecoverableWriter writer(
                        final FileSystem fs,
-                       final FunctionWithException<File, RefCountedFile, 
IOException> tempFileCreator,
+                       final FunctionWithException<File, 
RefCountedFileWithStream, IOException> tempFileCreator,
                        final S3AccessHelper s3AccessHelper,
                        final Executor uploadThreadPool,
                        final long userDefinedMinPartSize,
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
index 50ea9bd..368c9cf 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
@@ -134,11 +134,11 @@ public class RefCountedBufferingFileStreamTest {
                return new 
RefCountedBufferingFileStream(getRefCountedFileWithContent(), BUFFER_SIZE);
        }
 
-       private RefCountedFile getRefCountedFileWithContent() throws 
IOException {
+       private RefCountedFileWithStream getRefCountedFileWithContent() throws 
IOException {
                final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
                final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
-               return RefCountedFile.newFile(newFile, out);
+               return RefCountedFileWithStream.newFile(newFile, out);
        }
 
        private static byte[] bytesOf(String str) {
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 2e03197..217f4e1 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -25,14 +25,14 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.UUID;
 import java.util.stream.Stream;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Tests for the {@link RefCountedFile}.
  */
@@ -44,9 +44,9 @@ public class RefCountedFileTest {
        @Test
        public void releaseToZeroRefCounterShouldDeleteTheFile() throws 
IOException {
                final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
-               final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+               checkState(newFile.createNewFile());
 
-               RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, 
out);
+               RefCountedFile fileUnderTest = new RefCountedFile(newFile);
                verifyTheFileIsStillThere();
 
                fileUnderTest.release();
@@ -59,10 +59,10 @@ public class RefCountedFileTest {
        @Test
        public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws 
IOException {
                final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
-               final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+               checkState(newFile.createNewFile());
 
                // the reference counter always starts with 1 (not 0). This is 
why we need +1 releases
-               RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, 
out);
+               RefCountedFile fileUnderTest = new RefCountedFile(newFile);
                verifyTheFileIsStillThere();
 
                fileUnderTest.retain();
@@ -85,59 +85,12 @@ public class RefCountedFileTest {
                }
        }
 
-       @Test
-       public void writeShouldSucceed() throws IOException {
-               byte[] content = bytesOf("hello world");
-
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent(content);
-               long fileLength = fileUnderTest.getLength();
-
-               Assert.assertEquals(content.length, fileLength);
-       }
-
-       @Test
-       public void closeShouldNotReleaseReference() throws IOException {
-               getClosedRefCountedFileWithContent("hello world");
-               verifyTheFileIsStillThere();
-       }
-
-       @Test(expected = IOException.class)
-       public void writeAfterCloseShouldThrowException() throws IOException {
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
-               byte[] content = bytesOf("Hello Again");
-               fileUnderTest.write(content, 0, content.length);
-       }
-
-       @Test(expected = IOException.class)
-       public void flushAfterCloseShouldThrowException() throws IOException {
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
-               fileUnderTest.flush();
-       }
-
-       // ------------------------------------- Utilities 
-------------------------------------
-
        private void verifyTheFileIsStillThere() throws IOException {
                try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
                        Assert.assertEquals(1L, files.count());
                }
        }
 
-       private RefCountedFile getClosedRefCountedFileWithContent(String 
content) throws IOException {
-               return getClosedRefCountedFileWithContent(bytesOf(content));
-       }
-
-       private RefCountedFile getClosedRefCountedFileWithContent(byte[] 
content) throws IOException {
-               final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
-               final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-               final RefCountedFile fileUnderTest = 
RefCountedFile.newFile(newFile, out);
-
-               fileUnderTest.write(content, 0, content.length);
-
-               fileUnderTest.closeStream();
-               return fileUnderTest;
-       }
-
        private static byte[] bytesOf(String str) {
                return str.getBytes(StandardCharsets.UTF_8);
        }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
similarity index 56%
copy from 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
copy to 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
index 2e03197..7aa7240 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
@@ -34,62 +34,18 @@ import java.util.UUID;
 import java.util.stream.Stream;
 
 /**
- * Tests for the {@link RefCountedFile}.
+ * Tests for the {@link RefCountedFileWithStream}.
  */
-public class RefCountedFileTest {
+public class RefCountedFileWithStreamTest {
 
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        @Test
-       public void releaseToZeroRefCounterShouldDeleteTheFile() throws 
IOException {
-               final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
-               final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-               RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, 
out);
-               verifyTheFileIsStillThere();
-
-               fileUnderTest.release();
-
-               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
-                       Assert.assertEquals(0L, files.count());
-               }
-       }
-
-       @Test
-       public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws 
IOException {
-               final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
-               final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-
-               // the reference counter always starts with 1 (not 0). This is 
why we need +1 releases
-               RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, 
out);
-               verifyTheFileIsStillThere();
-
-               fileUnderTest.retain();
-               fileUnderTest.retain();
-
-               Assert.assertEquals(3, fileUnderTest.getReferenceCounter());
-
-               fileUnderTest.release();
-               Assert.assertEquals(2, fileUnderTest.getReferenceCounter());
-               verifyTheFileIsStillThere();
-
-               fileUnderTest.release();
-               Assert.assertEquals(1, fileUnderTest.getReferenceCounter());
-               verifyTheFileIsStillThere();
-
-               fileUnderTest.release();
-               // the file is deleted now
-               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
-                       Assert.assertEquals(0L, files.count());
-               }
-       }
-
-       @Test
        public void writeShouldSucceed() throws IOException {
                byte[] content = bytesOf("hello world");
 
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent(content);
+               final RefCountedFileWithStream fileUnderTest = 
getClosedRefCountedFileWithContent(content);
                long fileLength = fileUnderTest.getLength();
 
                Assert.assertEquals(content.length, fileLength);
@@ -103,14 +59,14 @@ public class RefCountedFileTest {
 
        @Test(expected = IOException.class)
        public void writeAfterCloseShouldThrowException() throws IOException {
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
+               final RefCountedFileWithStream fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
                byte[] content = bytesOf("Hello Again");
                fileUnderTest.write(content, 0, content.length);
        }
 
        @Test(expected = IOException.class)
        public void flushAfterCloseShouldThrowException() throws IOException {
-               final RefCountedFile fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
+               final RefCountedFileWithStream fileUnderTest = 
getClosedRefCountedFileWithContent("hello world");
                fileUnderTest.flush();
        }
 
@@ -122,15 +78,15 @@ public class RefCountedFileTest {
                }
        }
 
-       private RefCountedFile getClosedRefCountedFileWithContent(String 
content) throws IOException {
+       private RefCountedFileWithStream 
getClosedRefCountedFileWithContent(String content) throws IOException {
                return getClosedRefCountedFileWithContent(bytesOf(content));
        }
 
-       private RefCountedFile getClosedRefCountedFileWithContent(byte[] 
content) throws IOException {
+       private RefCountedFileWithStream 
getClosedRefCountedFileWithContent(byte[] content) throws IOException {
                final File newFile = new File(temporaryFolder.getRoot(), 
".tmp_" + UUID.randomUUID());
                final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
-               final RefCountedFile fileUnderTest = 
RefCountedFile.newFile(newFile, out);
+               final RefCountedFileWithStream fileUnderTest = 
RefCountedFileWithStream.newFile(newFile, out);
 
                fileUnderTest.write(content, 0, content.length);
 
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index f01da88..e8c6e9e 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.MathUtils;
 
@@ -320,7 +320,7 @@ public class RecoverableMultiPartUploadImplTest {
                final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
 
                final RefCountedBufferingFileStream testStream =
-                               new 
RefCountedBufferingFileStream(RefCountedFile.newFile(newFile, out), 
BUFFER_SIZE);
+                               new 
RefCountedBufferingFileStream(RefCountedFileWithStream.newFile(newFile, out), 
BUFFER_SIZE);
 
                testStream.write(content, 0, content.length);
                return testStream;
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
index 14ed2e2..b7c94c4 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -483,7 +483,7 @@ public class S3RecoverableFsDataOutputStreamTest {
                }
        }
 
-       private static class TestFileProvider implements 
FunctionWithException<File, RefCountedFile, IOException> {
+       private static class TestFileProvider implements 
FunctionWithException<File, RefCountedFileWithStream, IOException> {
 
                private final TemporaryFolder folder;
 
@@ -492,16 +492,16 @@ public class S3RecoverableFsDataOutputStreamTest {
                }
 
                @Override
-               public RefCountedFile apply(@Nullable File file) throws 
IOException {
+               public RefCountedFileWithStream apply(@Nullable File file) 
throws IOException {
                        while (true) {
                                try {
                                        if (file == null) {
                                                final File newFile = new 
File(folder.getRoot(), ".tmp_" + UUID.randomUUID());
                                                final OutputStream out = 
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
-                                               return 
RefCountedFile.newFile(newFile, out);
+                                               return 
RefCountedFileWithStream.newFile(newFile, out);
                                        } else {
                                                final OutputStream out = 
Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
-                                               return 
RefCountedFile.restoredFile(file, out, file.length());
+                                               return 
RefCountedFileWithStream.restoredFile(file, out, file.length());
                                        }
                                } catch (FileAlreadyExistsException e) {
                                        // fall through the loop and retry

Reply via email to