This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 52a2f03  [FLINK-15166][runtime] Fix the bug of wrongly recycling 
uncompressed buffer
52a2f03 is described below

commit 52a2f03eff658c5ec70b223a2c1551a96b4809dd
Author: caoyingjie <kevin....@alibaba-inc.com>
AuthorDate: Tue Dec 10 18:08:46 2019 +0800

    [FLINK-15166][runtime] Fix the bug of wrongly recycling uncompressed buffer
    
    For blocking shuffle data compression, the compressed intermediate buffer 
is recycled after it is written out. However when the data can not be 
compressed, the returned buffer is the original buffer which should not be 
recycled.
    
    This commit fixes the bug of wrongly recycling uncompressed buffer by 
comparing the returned buffer with the original buffer.
    
    (cherry picked from commit 4d693c4fbc5e6f3ff34ccb3cb3a1d9f35d6bbd76)
---
 .../partition/BoundedBlockingSubpartition.java     |  4 ++-
 .../test/runtime/ShuffleCompressionITCase.java     | 33 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 3a352ba..626ba3b 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -152,7 +152,9 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                                if (canBeCompressed(buffer)) {
                                        final Buffer compressedBuffer = 
parent.bufferCompressor.compressToIntermediateBuffer(buffer);
                                        data.writeBuffer(compressedBuffer);
-                                       compressedBuffer.recycleBuffer();
+                                       if (compressedBuffer != buffer) {
+                                               
compressedBuffer.recycleBuffer();
+                                       }
                                } else {
                                        data.writeBuffer(buffer);
                                }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 128267c..b4889f7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;
@@ -56,10 +57,21 @@ import static org.junit.Assert.assertFalse;
  */
 public class ShuffleCompressionITCase {
 
-       private static final int NUM_RECORDS_TO_SEND = 10 * 1024 * 1024;
+       private static final int NUM_BUFFERS_TO_SEND = 1000;
+
+       private static final int BUFFER_SIZE = 32 * 1024;
+
+       private static final int BYTES_PER_RECORD = 12;
+
+       /** We plus 1 to guarantee that the last buffer contains no more than 
one record and can not be compressed. */
+       private static final int NUM_RECORDS_TO_SEND = NUM_BUFFERS_TO_SEND * 
BUFFER_SIZE / BYTES_PER_RECORD + 1;
 
        private static final int PARALLELISM = 2;
 
+       private static final LongValue RECORD_TO_SEND = new 
LongValue(4387942071694473832L);
+
+       private static boolean useBroadcastPartitioner = false;
+
        @Test
        public void testDataCompressionForPipelineShuffle() throws Exception {
                executeTest(createJobGraph(ScheduleMode.EAGER, 
ResultPartitionType.PIPELINED, ExecutionMode.PIPELINED));
@@ -70,6 +82,12 @@ public class ShuffleCompressionITCase {
                executeTest(createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, 
ResultPartitionType.BLOCKING, ExecutionMode.BATCH));
        }
 
+       @Test
+       public void 
testDataCompressionForBlockingShuffleWithBroadcastPartitioner() throws 
Exception {
+               useBroadcastPartitioner = true;
+               executeTest(createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, 
ResultPartitionType.BLOCKING, ExecutionMode.BATCH));
+       }
+
        private void executeTest(JobGraph jobGraph) throws Exception {
                Configuration configuration = new Configuration();
                configuration.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY, 
"1g");
@@ -110,7 +128,7 @@ public class ShuffleCompressionITCase {
                sink.setParallelism(PARALLELISM);
                sink.setSlotSharingGroup(slotSharingGroup);
 
-               sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, resultPartitionType);
+               sink.connectNewDataSetAsInput(source, 
DistributionPattern.ALL_TO_ALL, resultPartitionType);
                JobGraph jobGraph = new JobGraph(source, sink);
                jobGraph.setScheduleMode(scheduleMode);
 
@@ -138,13 +156,16 @@ public class ShuffleCompressionITCase {
                                // enable output flush for pipeline mode
                                recordWriterBuilder.setTimeout(100);
                        }
+                       if (useBroadcastPartitioner) {
+                               recordWriterBuilder.setChannelSelector(new 
BroadcastPartitioner());
+                       }
                        RecordWriter<LongValue> writer = 
recordWriterBuilder.build(resultPartitionWriter);
 
                        for (int i = 0; i < NUM_RECORDS_TO_SEND; ++i) {
-                               LongValue value = new LongValue(i);
-                               writer.broadcastEmit(value);
+                               writer.broadcastEmit(RECORD_TO_SEND);
                        }
                        writer.flushAll();
+                       writer.clearBuffers();
                }
        }
 
@@ -164,9 +185,9 @@ public class ShuffleCompressionITCase {
                                new 
String[]{EnvironmentInformation.getTemporaryFileDirectory()});
 
                        LongValue value = new LongValue();
-                       for (int i = 0; i < NUM_RECORDS_TO_SEND; ++i) {
+                       for (int i = 0; i < PARALLELISM * NUM_RECORDS_TO_SEND; 
++i) {
                                reader.next(value);
-                               assertEquals(i, value.getValue());
+                               assertEquals(RECORD_TO_SEND.getValue(), 
value.getValue());
                        }
                }
        }

Reply via email to