[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360765#comment-16360765 ]
ASF GitHub Bot commented on FLINK-8547: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167535311 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java --- @@ -18,1426 +18,40 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import java.io.File; -import java.util.Arrays; -import java.util.Random; +import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** - * Tests for the behavior of the {@link BarrierBuffer}. + * Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller} --- End diff -- nit: Missing period in java doc (build failure). > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network > Affects Versions: 1.5.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)