[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353265#comment-16353265
 ] 

ASF GitHub Bot commented on FLINK-8547:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5400#discussion_r166174743
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
    @@ -0,0 +1,529 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
    +import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
    +import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
    +import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
    +import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
    +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
    +import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
    +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
    +import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayDeque;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
    +/**
    + * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
    + * all inputs have received the barrier for a given checkpoint.
    + *
    + * <p>The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
    + * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
    + * flow control.
    + */
    +@Internal
    +public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
    +
    +   /** The gate that the buffer draws its input from. */
    +   private final InputGate inputGate;
    +
    +   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
    +   private final boolean[] blockedChannels;
    +
    +   /** The total number of channels that this buffer handles data from. */
    +   private final int totalNumberOfInputChannels;
    +
    +   /** The utility to buffer blocked data in the memory queue. */
    +   private final CreditBasedBufferBlocker bufferBlocker;
    +
    +   /**
    +    * The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
    +    * from the input gate.
    +    */
    +   private final ArrayDeque<BufferOrEventSequence> queuedBuffered;
    --- End diff --
    
    I think we can not directly mix all the blocked buffers for different 
checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` 
which indicates the blocked buffers for a specific checkpoint id, otherwise we 
can not know when the blocked buffers are exhausted after reset a specific 
checkpoint id. 
    
    If we want to use only one `ArrayDeque` for blocking all buffers, we may 
need to insert extra hints of checkpoint id into this queue for helping when to 
stop reading blocked buffers from the queue.
    
    For example:
    channel1: [cp1,cp2,b1,cp3,b2,b3]
    channel2: [cp2]
    
    1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as 
separate sequence1.
    2. When reading cp2 from channel2, the cp1 is released and begins to read 
sequence1.
    3. When reading cp2 from seq1, the following buffers will be blocked in new 
seq2.
    4. When reading cp3 from seq1,the cp2 is released and the seq2 only 
contains [b1].
    5. The following buffers after cp3 will be blocked in new seq3 which 
contains[b2,b3].
    
    So every sequence indicates the blocked buffers belonging to different 
checkpoint id, and they will be read first after this checkpoint id is released.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8547
>                 URL: https://issues.apache.org/jira/browse/FLINK-8547
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to