AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#discussion_r402911412
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 ##########
 @@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while 
reading the first barrier
+ * and keeping track of the number of received barriers and consumed barriers.
+ */
+@Internal
+public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBarrierUnaligner.class);
+       public static final CompletableFuture<Void> DONE = 
CompletableFuture.completedFuture(null);
+
+       private final String taskName;
+
+       /**
+        * Tag the state of which input channel has read the barrier. If one 
channel has read the barrier by task,
+        * the respective in-flight input buffers should be empty when 
triggering unaligned checkpoint.
+        */
+       private final boolean[] barrierConsumedChannels;
+
+       /**
+        * Tag the state of which input channel has received the barrier. Until 
the barrier of a specific channel is
+        * received, all new buffers need to be persisted after unaligned 
checkpointing has been started.
+        */
+       private final boolean[] barrierReceivedChannels;
+
+       /**
+        * Contains the offsets of the channel indices for each gate when 
flattening the channels of all gates.
+        *
+        * <p>For example, consider 3 gates with 4 channels, {@code 
gateChannelOffsets = [0, 4, 8]}.
+        */
+       private final int[] gateChannelOffsets;
+
+       /**
+        * The number of input channels which has read the barrier by task.
+        */
+       private int numBarriersReceived;
+
+       /**
+        * The checkpoint id to guarantee that we would trigger only one 
checkpoint when reading the same barrier from
+        * different channels.
+        */
+       private long currentCheckpointId = -1L;
+
+       /** The number of opened channels. */
+       private int numOpenChannels;
+
+       /** A future indicating that all barriers of the a given checkpoint 
have been read. */
+       private CompletableFuture<Void> allBarriersReceivedFuture = DONE;
+
+       private final ChannelStateWriter channelStateWriter;
+
+       CheckpointBarrierUnaligner(
+                       int[] numberOfInputChannelsPerGate,
+                       ChannelStateWriter channelStateWriter,
+                       String taskName,
+                       AbstractInvokable toNotifyOnCheckpoint) {
+               super(toNotifyOnCheckpoint);
+
+               this.taskName = taskName;
+               this.channelStateWriter = checkNotNull(channelStateWriter);
+
+               final int numGates = numberOfInputChannelsPerGate.length;
+
+               gateChannelOffsets = new int[numGates];
+               for (int index = 1; index < numGates; index++) {
+                       gateChannelOffsets[index] = gateChannelOffsets[index - 
1] + numberOfInputChannelsPerGate[index - 1];
+               }
+
+               final int totalNumChannels = gateChannelOffsets[numGates - 1] + 
numberOfInputChannelsPerGate[numGates - 1];
+               barrierConsumedChannels = new boolean[totalNumChannels];
+               Arrays.fill(barrierConsumedChannels, true);
+               barrierReceivedChannels = new boolean[totalNumChannels];
+               Arrays.fill(barrierReceivedChannels, true);
+               numOpenChannels = totalNumChannels;
+       }
+
+       @Override
+       public void releaseBlocksAndResetBarriers() {
+               // make sure no additional data is persisted
+               Arrays.fill(barrierConsumedChannels, true);
 
 Review comment:
   I thought about it even more, since your point is valid. Probably both 
fields are too technical as of now to meet my needs.
   They should be `barrierReceivedChannels` -> `storeNewBuffers` (invert 
boolean) and `barrierConsumedChannels` -> `hasInflightBuffers` (invert boolean) 
.
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to