This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3dacffe35709f9747923dad4c7028baec27e2651 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu May 7 15:59:26 2020 +0200 [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge with next?) --- .../io/network/api/serialization/SpanningWrapper.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java index 18ea6cc..9cffde3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.io.network.api.serialization; +import org.apache.flink.core.fs.RefCountedFile; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -74,7 +75,7 @@ final class SpanningWrapper { private int leftOverLimit; - private File spillFile; + private RefCountedFile spillFile; private DataInputViewStreamWrapper spillFileReader; @@ -136,7 +137,7 @@ final class SpanningWrapper { accumulatedRecordBytes += length; if (hasFullRecord()) { spillingChannel.close(); - spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE)); + spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE)); } } @@ -220,7 +221,6 @@ final class SpanningWrapper { return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position()); } - @SuppressWarnings("ResultOfMethodCallIgnored") public void clear() { buffer = initialBuffer; serializationReadBuffer.releaseArrays(); @@ -232,7 +232,7 @@ final class SpanningWrapper { leftOverLimit = 0; accumulatedRecordBytes = 0; - closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete()); + closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release()); spillingChannel = null; spillFileReader = null; spillFile = null; @@ -260,9 +260,10 @@ final class SpanningWrapper { int maxAttempts = 10; for (int attempt = 0; attempt < maxAttempts; attempt++) { String directory = tempDirs[rnd.nextInt(tempDirs.length)]; - spillFile = new File(directory, randomString(rnd) + ".inputchannel"); - if (spillFile.createNewFile()) { - return new RandomAccessFile(spillFile, "rw").getChannel(); + File file = new File(directory, randomString(rnd) + ".inputchannel"); + if (file.createNewFile()) { + spillFile = new RefCountedFile(file); + return new RandomAccessFile(file, "rw").getChannel(); } }
