[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372804#comment-16372804 ] ASF GitHub Bot commented on FLINK-8547: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5400 > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368912#comment-16368912 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , I have changed the `EXACTLY_ONCE_BLOCKING_DATA_ENABLED` as true and squashed the commits. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362335#comment-16362335 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 Thanks for rebasing the conflicts. Yes, the default value can be changed to true after the credit-based is totally merged. If need any changes on my side after all, pls let me know. :) > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360864#comment-16360864 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , I have submitted the updates for above comments. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360850#comment-16360850 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167582971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; + +/** + * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the blocker can "roll over": It presents the added + * elements as a readable sequence, and creates a new sequence. + */ +@Internal +public interface BufferBlocker { + + /** +* Adds a buffer or event to the blocker. +* +* @param boe The buffer or event to be added into the blocker. +*/ + void add(BufferOrEvent boe) throws IOException; + + /** +* Starts a new sequence of buffers and event and returns the current sequence of buffers for reading. +* This method returns {@code null}, if nothing was added since the creation, or the last call to this method. +* +* @param newBuffer only works for {@link BufferSpiller} implements currently. --- End diff -- sure > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360848#comment-16360848 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167582763 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; + +/** + * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the blocker can "roll over": It presents the added + * elements as a readable sequence, and creates a new sequence. + */ +@Internal +public interface BufferBlocker { + + /** +* Adds a buffer or event to the blocker. +* +* @param boe The buffer or event to be added into the blocker. +*/ + void add(BufferOrEvent boe) throws IOException; + + /** +* Starts a new sequence of buffers and event and returns the current sequence of buffers for reading. +* This method returns {@code null}, if nothing was added since the creation, or the last call to this method. +* +* @param newBuffer only works for {@link BufferSpiller} implements currently. +* @return The readable sequence of buffers and events, or 'null', if nothing was added. +*/ + BufferOrEventSequence rollOver(boolean newBuffer) throws IOException; --- End diff -- sure > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360849#comment-16360849 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167582882 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java --- @@ -18,1426 +18,40 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import java.io.File; -import java.util.Arrays; -import java.util.Random; +import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** - * Tests for the behavior of the {@link BarrierBuffer}. + * Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller} */ -public class BarrierBufferTest { - - private static final Random RND = new Random(); - - private static final int PAGE_SIZE = 512; - - private static int sizeCounter = 0; +public class BarrierBufferTest extends BarrierBufferTestBase { --- End diff -- make sense > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360763#comment-16360763 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167537474 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; + +/** + * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the blocker can "roll over": It presents the added + * elements as a readable sequence, and creates a new sequence. + */ +@Internal +public interface BufferBlocker { + + /** +* Adds a buffer or event to the blocker. +* +* @param boe The buffer or event to be added into the blocker. +*/ + void add(BufferOrEvent boe) throws IOException; + + /** +* Starts a new sequence of buffers and event and returns the current sequence of buffers for reading. +* This method returns {@code null}, if nothing was added since the creation, or the last call to this method. +* +* @param newBuffer only works for {@link BufferSpiller} implements currently. --- End diff -- Java doc in this interface shouldn't mention implementation specific details. On the other hand, this java doc doesn't explain what `newBuffer` is doing and for this information one must check the `BufferSpiller`'s java doc itself. Can you add appropriate java doc here, or better add java doc to proposed in the comment below two methods: `rollOverWithoutReusingResources()` and `rollOverReusingResources()`. Comment in `CachedBufferBlocker.java#rollOverReusingResources` should state that it is never reusing resources and is defaulting to `CachedBufferBlocker.java#rollOverWithoutReusingResources` > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360765#comment-16360765 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167535311 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java --- @@ -18,1426 +18,40 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import java.io.File; -import java.util.Arrays; -import java.util.Random; +import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** - * Tests for the behavior of the {@link BarrierBuffer}. + * Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller} --- End diff -- nit: Missing period in java doc (build failure). > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360762#comment-16360762 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167557594 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java --- @@ -18,1426 +18,40 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import java.io.File; -import java.util.Arrays; -import java.util.Random; +import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** - * Tests for the behavior of the {@link BarrierBuffer}. + * Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller} */ -public class BarrierBufferTest { - - private static final Random RND = new Random(); - - private static final int PAGE_SIZE = 512; - - private static int sizeCounter = 0; +public class BarrierBufferTest extends BarrierBufferTestBase { --- End diff -- Rename the test class name to `SpillingBarrierBufferTest`? > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360761#comment-16360761 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167537059 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; + +/** + * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the blocker can "roll over": It presents the added + * elements as a readable sequence, and creates a new sequence. + */ +@Internal +public interface BufferBlocker { + + /** +* Adds a buffer or event to the blocker. +* +* @param boe The buffer or event to be added into the blocker. +*/ + void add(BufferOrEvent boe) throws IOException; + + /** +* Starts a new sequence of buffers and event and returns the current sequence of buffers for reading. +* This method returns {@code null}, if nothing was added since the creation, or the last call to this method. +* +* @param newBuffer only works for {@link BufferSpiller} implements currently. +* @return The readable sequence of buffers and events, or 'null', if nothing was added. +*/ + BufferOrEventSequence rollOver(boolean newBuffer) throws IOException; --- End diff -- Could we stick with two methods in the interface? I think more descriptive names will be better compared to parameter here: `rollOverWithoutReusingResources()` and `rollOverReusingResources()`, where: `rollOverWithoutReusingResources` == `rollOver(true)`. Especially if one implementation doesn't support one of those calls. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360764#comment-16360764 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167535452 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link BufferBlocker}. + */ +public abstract class BufferBlockerTestBase { + + protected static final int PAGE_SIZE = 4096; + + abstract BufferBlocker createBufferBlocker(); + + @Test + public void testRollOverEmptySequences() throws IOException { + BufferBlocker bufferBlocker = createBufferBlocker(); + assertNull(bufferBlocker.rollOver(false)); + assertNull(bufferBlocker.rollOver(false)); + assertNull(bufferBlocker.rollOver(false)); + } + + @Test + public void testSpillAndRollOverSimple() throws IOException { + final Random rnd = new Random(); + final Random bufferRnd = new Random(); + + final int maxNumEventsAndBuffers = 3000; + final int maxNumChannels = 1656; + + BufferBlocker bufferBlocker = createBufferBlocker(); + + // do multiple spilling / rolling over rounds + for (int round = 0; round < 5; round++) { + + final long bufferSeed = rnd.nextLong(); + bufferRnd.setSeed(bufferSeed); + + final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1; + final int numChannels = rnd.nextInt(maxNumChannels) + 1; + + final ArrayList events = new ArrayList(128); + + // generate sequence + for (int i = 0; i < numEventsAndBuffers; i++) { + boolean isEvent = rnd.nextDouble() < 0.05d; + BufferOrEvent evt; + if (isEvent) { + evt = generateRandomEvent(rnd, numChannels); + events.add(evt); + } else { + evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); + } + bufferBlocker.add(evt); + } + + // reset and create reader + bufferRnd.setSeed(bufferSeed); + + BufferOrEventSequence seq = bufferBlocker.rollOver(false); + seq.open(); + + // read and validate the sequence + + int numEvent = 0; + for (int i = 0; i < numEventsAndBuffers; i++) { + BufferOrEvent next = s
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358151#comment-16358151 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for suggestions and I totally agree with that. That abstraction indeed makes the code simple. I will update the codes ASAP. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358128#comment-16358128 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167163798 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { + + private static final Random RND = new Random(); + + private static int sizeCounter = 1; + + public static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier( + checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel); + } + + public static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { --- End diff -- Instead of using static methods please use inheritance - make `BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend `BarrierBufferTestBase`. Especially that name `*Base` already suggests that. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358129#comment-16358129 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167162743 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { --- End diff -- This is not exactly what I had in mind by deduplication of `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. Both of those tests are still pretty much copy of one another and those static methods are only a fraction of duplication. Look for example at the `testSingleChannelNoBarriers()` they are 99% identical. All of it's code could be moved to `BarrierBufferTestBase`. `BarrierBufferTestBase` would only need to define abstract method `CheckpointBarrierHandler createBarrierHandler()` which would be define differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` added as an `@After` test hook. Same applies to all of the other tests. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to ro
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355191#comment-16355191 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , I have submitted a separate commit to address above comments. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353271#comment-16353271 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166175837 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- I think we can change the current `CheckpointBarrierHandler` interface into abstract class and then add a `createBarrierHanlder` method for extracting the common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we define a new class for the common method. I prefer the first way. What do you think? > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353265#comment-16353265 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r166174743 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- I think we can not directly mix all the blocked buffers for different checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` which indicates the blocked buffers for a specific checkpoint id, otherwise we can not know when the blocked buffers are exhausted after reset a specific checkpoint id. If we want to use only one `ArrayDeque` for blocking all buffers, we may need to insert extra hints of checkpoint id into this queue for helping when to stop reading blocked buffers from the queue. For example: channel1: [cp1,cp2,b1,cp3,b2,b3] channel2: [cp2] 1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as sep
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352520#comment-16352520 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for reviews! I understand your concerns and I should deduplicate some common utils in these tests. I will do that tomorrow together with other comments! > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352509#comment-16352509 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165998584 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- yes, i will consider a proper way > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352505#comment-16352505 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165997853 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- The current implementation keeps the same logic with `BarrierBuffer`. I am wondering whether it can make sense if only keeping one `ArrayDeque` for holding all blocking buffers for different checkpoint ids. Especially for the uncommon case mentioned on line 496 in `BarrierBuffer`. I will double check that logic and reply to you later. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink >
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352441#comment-16352441 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983714 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based --- End diff -- sure > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352438#comment-16352438 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983607 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; --- End diff -- the checkstyle failures are fixed > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352436#comment-16352436 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165983496 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -184,6 +184,18 @@ key("taskmanager.network.detailed-metrics") .defaultValue(false); + /** +* Config parameter defining whether to spill data for channels with barrier or not in exactly-once +* mode based on credit-based flow control. +* +* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of +* credit-based flow control. +*/ + @Deprecated + public static final ConfigOption EXACTLY_ONCE_BLOCKING_DATA_ENABLED = + key("taskmanager.exactly-once.blocking.data.enabled") + .defaultValue(false); --- End diff -- yes, the default value should be true, but I think it should be changed after the `FLINK-7456` is merged to make the credit-based work. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352284#comment-16352284 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165930338 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -184,6 +184,18 @@ key("taskmanager.network.detailed-metrics") .defaultValue(false); + /** +* Config parameter defining whether to spill data for channels with barrier or not in exactly-once +* mode based on credit-based flow control. +* +* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of +* credit-based flow control. +*/ + @Deprecated + public static final ConfigOption EXACTLY_ONCE_BLOCKING_DATA_ENABLED = + key("taskmanager.exactly-once.blocking.data.enabled") + .defaultValue(false); --- End diff -- I think we would like to enable it by default and leave this config option just as a safety net in case of bugs/problems. btw, shouldn't this be tightly coupled with a credit based flow switch? > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352280#comment-16352280 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165942614 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based + * flow control. + */ +@Internal +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedBarrierBuffer.class); + + /** The gate that the buffer draws its input from. */ + private final InputGate inputGate; + + /** Flags that indicate whether a channel is currently blocked/buffered. */ + private final boolean[] blockedChannels; + + /** The total number of channels that this buffer handles data from. */ + private final int totalNumberOfInputChannels; + + /** The utility to buffer blocked data in the memory queue. */ + private final CreditBasedBufferBlocker bufferBlocker; + + /** +* The pending blocked buffer/event sequences. Must be consumed before requesting further data +* from the input gate. +*/ + private final ArrayDeque queuedBuffered; --- End diff -- Do we need this `queuedBuffered` and `currentBuffered` fields with `CreditBasedBufferBlocker`? Why can not we just use `ArrayDeque currentBuffers` field from `CreditBasedBufferBlocker` for this? Why do we need this triple level buffering here? In original code it made sense, since instead of `CreditBasedBufferBlocker` there was a `BufferSpiller`. Getting rid of those three fields would vastly simplify this class. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 >
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352283#comment-16352283 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165943880 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTest.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CreditBasedBufferBlocker}. + */ +public class BufferBlockerTest { --- End diff -- Rename class to `CreditBasedBufferBlockerTest` > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352282#comment-16352282 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165945174 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTest.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CreditBasedBufferBlocker}. + */ +public class BufferBlockerTest { + + private static final int PAGE_SIZE = 4096; + + private CreditBasedBufferBlocker bufferBlocker; + + + // + // Setup / Cleanup + // + + @Before + public void createSpiller() { + bufferBlocker = new CreditBasedBufferBlocker(PAGE_SIZE); + } + + @After + public void cleanupSpiller() { + if (bufferBlocker != null) { + bufferBlocker.close(); + } + } + + // + // Tests + // + + @Test + public void testRollOverEmptySequences() { + assertNull(bufferBlocker.rollOver()); + assertNull(bufferBlocker.rollOver()); + assertNull(bufferBlocker.rollOver()); + } + + @Test + public void testSpillAndRollOverSimple() { + final Random rnd = new Random(); + final Random bufferRnd = new Random(); + + final int maxNumEventsAndBuffers = 3000; + final int maxNumChannels = 1656; + + // do multiple blocking / rolling over rounds + for (int round = 0; round < 5; round++) { --- End diff -- Can you deduplicate the code of those two unit tests (`testSpillAndRollOverSimple` and `testSpillWhileReading`)? It seems like this one is just a one sequence of the next one? > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352279#comment-16352279 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165933824 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * all inputs have received the barrier for a given checkpoint. + * + * The BarrierBuffer continues receiving buffers from the blocked channels and buffered them + * internally until the blocks are released. It will not cause deadlocks based on credit-based --- End diff -- Please explain a little bit more `It will not cause deadlocks based on credit-based flow control` part in the comment. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352281#comment-16352281 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165943080 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java --- @@ -131,10 +131,14 @@ public StreamInputProcessor( long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() --- End diff -- Please extract this and the same code from `StreamTwoInputProcessor.java` into a common method. I think all of the lines upto `this.lock = checkNotNull(lock);` could be unified. Maybe into some base class. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352278#comment-16352278 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r165930413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java --- @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; --- End diff -- nit: There were some checkstyle failures > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349921#comment-16349921 ] ASF GitHub Bot commented on FLINK-8547: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/5400 [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once ## What is the purpose of the change *Currently in exactly-once mode, the BarrierBuffer would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the BarrierBuffer has to spill the data in disk files to recycle the buffers for blocked channels.* *Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new CheckpointBarrierHandler for only buffering the data for blocked channels for better performance.* *And this new CheckpointBarrierHandler can also be configured to use or not in order to rollback the original mode for unexpected risks.* ## Brief change log - *Implement the new `CreditBasedBarrierBuffer` and `CreditBasedBufferBlocker` for buffering data in blocked channels in exactly-once mode.* - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` for enabling the new handler or not.* ## Verifying this change This change added tests and can be verified as follows: - *Added tests for the logic of `CreditBasedBarrierBuffer`* - *Added tests for the logic of `CreditBasedBufferBlocker`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-8547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5400.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5400 commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe Author: Zhijiang Date: 2018-02-02T07:45:49Z [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)