This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new f266975 [FLINK-10469][core] make sure to always write the whole buffer to FileChannel f266975 is described below commit f266975a38523f7fc5ab09e8d7a1fe1dd41ef54a Author: Nico Kruber <n...@data-artisans.com> AuthorDate: Tue Oct 2 21:59:18 2018 +0200 [FLINK-10469][core] make sure to always write the whole buffer to FileChannel --- .../src/main/java/org/apache/flink/util/FileUtils.java | 10 ++++++++++ .../io/disk/iomanager/AsynchronousFileIOChannel.java | 7 ++++--- .../SpillingAdaptiveSpanningRecordDeserializer.java | 5 +++-- .../apache/flink/streaming/runtime/io/BufferSpiller.java | 10 +++------- .../runtime/io/SpilledBufferOrEventSequenceTest.java | 13 +++++++------ 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 23af2e8..8f32262 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -28,6 +28,8 @@ import org.apache.flink.util.function.ThrowingConsumer; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; @@ -56,6 +58,14 @@ public final class FileUtils { // ------------------------------------------------------------------------ + public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException { + while (src.hasRemaining()) { + channel.write(src); + } + } + + // ------------------------------------------------------------------------ + /** * Constructs a random filename with the given prefix and * a random part generated from hex characters. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java index 0e575d3..ddb0c4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.event.NotificationListener; +import org.apache.flink.util.FileUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -341,7 +342,7 @@ final class SegmentWriteRequest implements WriteRequest { @Override public void write() throws IOException { try { - this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size())); + FileUtils.writeCompletely(this.channel.fileChannel, this.segment.wrap(0, this.segment.size())); } catch (NullPointerException npex) { throw new IOException("Memory segment has been released."); @@ -375,8 +376,8 @@ final class BufferWriteRequest implements WriteRequest { header.putInt(nioBufferReadable.remaining()); header.flip(); - channel.fileChannel.write(header); - channel.fileChannel.write(nioBufferReadable); + FileUtils.writeCompletely(channel.fileChannel, header); + FileUtils.writeCompletely(channel.fileChannel, nioBufferReadable); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 8630ace..a78cb4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; import java.io.BufferedInputStream; @@ -481,7 +482,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit this.spillingChannel = createSpillingChannel(); ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); - this.spillingChannel.write(toWrite); + FileUtils.writeCompletely(this.spillingChannel, toWrite); } else { // collect in memory @@ -528,7 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (spillingChannel != null) { // spill to file ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy); - this.spillingChannel.write(toWrite); + FileUtils.writeCompletely(this.spillingChannel, toWrite); } else { segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 7a0be33..eb191a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; import java.io.File; @@ -75,9 +76,6 @@ public class BufferSpiller implements BufferBlocker { /** The buffer that encodes the spilled header. */ private final ByteBuffer headBuffer; - /** The reusable array that holds header and contents buffers. */ - private final ByteBuffer[] sources; - /** The file that we currently spill to. */ private File currentSpillFile; @@ -109,8 +107,6 @@ public class BufferSpiller implements BufferBlocker { this.headBuffer = ByteBuffer.allocateDirect(16); this.headBuffer.order(ByteOrder.LITTLE_ENDIAN); - this.sources = new ByteBuffer[] { this.headBuffer, null }; - File[] tempDirs = ioManager.getSpillingDirectories(); this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length]; @@ -148,8 +144,8 @@ public class BufferSpiller implements BufferBlocker { bytesWritten += (headBuffer.remaining() + contents.remaining()); - sources[1] = contents; - currentChannel.write(sources); + FileUtils.writeCompletely(currentChannel, headBuffer); + FileUtils.writeCompletely(currentChannel, contents); } finally { if (boe.isBuffer()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java index adbe240..c1ff79f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence; +import org.apache.flink.util.FileUtils; import org.junit.After; import org.junit.Before; @@ -107,7 +108,7 @@ public class SpilledBufferOrEventSequenceTest { ByteBuffer buf = ByteBuffer.allocate(7); buf.order(ByteOrder.LITTLE_ENDIAN); - fileChannel.write(buf); + FileUtils.writeCompletely(fileChannel, buf); fileChannel.position(0); SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize); @@ -175,7 +176,7 @@ public class SpilledBufferOrEventSequenceTest { data.put((byte) 0); data.position(0); data.limit(312); - fileChannel.write(data); + FileUtils.writeCompletely(fileChannel, data); fileChannel.position(0L); SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize); @@ -414,7 +415,7 @@ public class SpilledBufferOrEventSequenceTest { ByteBuffer data = ByteBuffer.allocate(157); data.order(ByteOrder.LITTLE_ENDIAN); - fileChannel.write(data); + FileUtils.writeCompletely(fileChannel, data); fileChannel.position(54); SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize); @@ -451,8 +452,8 @@ public class SpilledBufferOrEventSequenceTest { header.put((byte) 1); header.flip(); - fileChannel.write(header); - fileChannel.write(serializedEvent); + FileUtils.writeCompletely(fileChannel, header); + FileUtils.writeCompletely(fileChannel, serializedEvent); return new BufferOrEvent(evt, channelIndex); } @@ -467,7 +468,7 @@ public class SpilledBufferOrEventSequenceTest { data.put((byte) i); } data.flip(); - fileChannel.write(data); + FileUtils.writeCompletely(fileChannel, data); } private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {