This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dacffe35709f9747923dad4c7028baec27e2651
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu May 7 15:59:26 2020 +0200

    [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge
    with next?)
---
 .../io/network/api/serialization/SpanningWrapper.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 18ea6cc..9cffde3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.fs.RefCountedFile;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -74,7 +75,7 @@ final class SpanningWrapper {
 
        private int leftOverLimit;
 
-       private File spillFile;
+       private RefCountedFile spillFile;
 
        private DataInputViewStreamWrapper spillFileReader;
 
@@ -136,7 +137,7 @@ final class SpanningWrapper {
                accumulatedRecordBytes += length;
                if (hasFullRecord()) {
                        spillingChannel.close();
-                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
+                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile.getFile()), 
FILE_BUFFER_SIZE));
                }
        }
 
@@ -220,7 +221,6 @@ final class SpanningWrapper {
                return accumulatedRecordBytes + (recordLength >= 0 ? 
LENGTH_BYTES : lengthBuffer.position());
        }
 
-       @SuppressWarnings("ResultOfMethodCallIgnored")
        public void clear() {
                buffer = initialBuffer;
                serializationReadBuffer.releaseArrays();
@@ -232,7 +232,7 @@ final class SpanningWrapper {
                leftOverLimit = 0;
                accumulatedRecordBytes = 0;
 
-               closeAllQuietly(spillingChannel, spillFileReader, () -> 
spillFile.delete());
+               closeAllQuietly(spillingChannel, spillFileReader, () -> 
spillFile.release());
                spillingChannel = null;
                spillFileReader = null;
                spillFile = null;
@@ -260,9 +260,10 @@ final class SpanningWrapper {
                int maxAttempts = 10;
                for (int attempt = 0; attempt < maxAttempts; attempt++) {
                        String directory = 
tempDirs[rnd.nextInt(tempDirs.length)];
-                       spillFile = new File(directory, randomString(rnd) + 
".inputchannel");
-                       if (spillFile.createNewFile()) {
-                               return new RandomAccessFile(spillFile, 
"rw").getChannel();
+                       File file = new File(directory, randomString(rnd) + 
".inputchannel");
+                       if (file.createNewFile()) {
+                               spillFile = new RefCountedFile(file);
+                               return new RandomAccessFile(file, 
"rw").getChannel();
                        }
                }
 

Reply via email to