This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 52a2f03 [FLINK-15166][runtime] Fix the bug of wrongly recycling uncompressed buffer 52a2f03 is described below commit 52a2f03eff658c5ec70b223a2c1551a96b4809dd Author: caoyingjie <kevin....@alibaba-inc.com> AuthorDate: Tue Dec 10 18:08:46 2019 +0800 [FLINK-15166][runtime] Fix the bug of wrongly recycling uncompressed buffer For blocking shuffle data compression, the compressed intermediate buffer is recycled after it is written out. However when the data can not be compressed, the returned buffer is the original buffer which should not be recycled. This commit fixes the bug of wrongly recycling uncompressed buffer by comparing the returned buffer with the original buffer. (cherry picked from commit 4d693c4fbc5e6f3ff34ccb3cb3a1d9f35d6bbd76) --- .../partition/BoundedBlockingSubpartition.java | 4 ++- .../test/runtime/ShuffleCompressionITCase.java | 33 ++++++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 3a352ba..626ba3b 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -152,7 +152,9 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { if (canBeCompressed(buffer)) { final Buffer compressedBuffer = parent.bufferCompressor.compressToIntermediateBuffer(buffer); data.writeBuffer(compressedBuffer); - compressedBuffer.recycleBuffer(); + if (compressedBuffer != buffer) { + compressedBuffer.recycleBuffer(); + } } else { data.writeBuffer(buffer); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java index 128267c..b4889f7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.types.LongValue; import org.junit.Test; @@ -56,10 +57,21 @@ import static org.junit.Assert.assertFalse; */ public class ShuffleCompressionITCase { - private static final int NUM_RECORDS_TO_SEND = 10 * 1024 * 1024; + private static final int NUM_BUFFERS_TO_SEND = 1000; + + private static final int BUFFER_SIZE = 32 * 1024; + + private static final int BYTES_PER_RECORD = 12; + + /** We plus 1 to guarantee that the last buffer contains no more than one record and can not be compressed. */ + private static final int NUM_RECORDS_TO_SEND = NUM_BUFFERS_TO_SEND * BUFFER_SIZE / BYTES_PER_RECORD + 1; private static final int PARALLELISM = 2; + private static final LongValue RECORD_TO_SEND = new LongValue(4387942071694473832L); + + private static boolean useBroadcastPartitioner = false; + @Test public void testDataCompressionForPipelineShuffle() throws Exception { executeTest(createJobGraph(ScheduleMode.EAGER, ResultPartitionType.PIPELINED, ExecutionMode.PIPELINED)); @@ -70,6 +82,12 @@ public class ShuffleCompressionITCase { executeTest(createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, ResultPartitionType.BLOCKING, ExecutionMode.BATCH)); } + @Test + public void testDataCompressionForBlockingShuffleWithBroadcastPartitioner() throws Exception { + useBroadcastPartitioner = true; + executeTest(createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, ResultPartitionType.BLOCKING, ExecutionMode.BATCH)); + } + private void executeTest(JobGraph jobGraph) throws Exception { Configuration configuration = new Configuration(); configuration.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY, "1g"); @@ -110,7 +128,7 @@ public class ShuffleCompressionITCase { sink.setParallelism(PARALLELISM); sink.setSlotSharingGroup(slotSharingGroup); - sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, resultPartitionType); + sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType); JobGraph jobGraph = new JobGraph(source, sink); jobGraph.setScheduleMode(scheduleMode); @@ -138,13 +156,16 @@ public class ShuffleCompressionITCase { // enable output flush for pipeline mode recordWriterBuilder.setTimeout(100); } + if (useBroadcastPartitioner) { + recordWriterBuilder.setChannelSelector(new BroadcastPartitioner()); + } RecordWriter<LongValue> writer = recordWriterBuilder.build(resultPartitionWriter); for (int i = 0; i < NUM_RECORDS_TO_SEND; ++i) { - LongValue value = new LongValue(i); - writer.broadcastEmit(value); + writer.broadcastEmit(RECORD_TO_SEND); } writer.flushAll(); + writer.clearBuffers(); } } @@ -164,9 +185,9 @@ public class ShuffleCompressionITCase { new String[]{EnvironmentInformation.getTemporaryFileDirectory()}); LongValue value = new LongValue(); - for (int i = 0; i < NUM_RECORDS_TO_SEND; ++i) { + for (int i = 0; i < PARALLELISM * NUM_RECORDS_TO_SEND; ++i) { reader.next(value); - assertEquals(i, value.getValue()); + assertEquals(RECORD_TO_SEND.getValue(), value.getValue()); } } }