[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-31 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567541421



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -563,7 +554,7 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 createPendingCheckpoint(
 timestamp,
 request.props,
-ackTasks,
+checkpointBrief,

Review comment:
   What I understand is that the `CheckpointBrief` describes how to “get” a 
`CompleteChekcpoint`. There are some methods related to“get”a 
CompleteCheckpoint, such as 
   
“createPendingCheckpoint/reportToStateTracker/sendAbortedMessage/sendAcknowledgeMessages”.
   
   However, these methods sometime use `CheckpointBrief`, sometime uses 
`tasksTowaitFor/tasksToTriggerFor`, which are the internal state of the 
`CheckpointCoordinator`. After we refactor maybe all the methods could only use 
the `CheckpointBrief` as their parameter. (I think CheckpointId,TimeStamps 
should also be in the CheckpointBrief).
   
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-31 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567528797



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+private final JobID jobId;
+
+private final List tasksToTrigger;
+
+private final List tasksToWait;
+
+private final List tasksToCommitTo;
+
+public CheckpointBriefCalculator(
+JobID jobId,
+List tasksToTrigger,
+List tasksToWait,
+List tasksToCommitTo) {
+
+this.jobId = jobId;
+this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+}
+
+public CheckpointBrief calculateCheckpointBrief() throws 
CheckpointException {
+List tasksToTrigger = getTriggerExecutions();
+Map tasksToWait = getAckTasks();
+
+return new CheckpointBrief(
+Collections.unmodifiableList(tasksToTrigger), tasksToWait, 
tasksToCommitTo);

Review comment:
   Maybe the `tasksToWait` could also be `unmodifialbeMap`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -2122,7 +2065,7 @@ public boolean isForce() {
 
 private Optional getVertex(ExecutionAttemptID id) throws 
CheckpointException {

Review comment:
   no `CheckpointException` any more.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -563,7 +554,7 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 createPendingCheckpoint(
 timestamp,
 request.props,
-ackTasks,
+checkpointBrief,

Review comment:
   What I understand is that the `CheckpointBrief` describes how to “get” a 
`CompleteChekcpoint`. There are some methods related to“get”a 
CompleteCheckpoint, such as 
   
“createPendingCheckpoint/reportToStateTracker/sendAbortedMessage/sendAcknowledgeMessages”.
   
   However, these methods sometime use `CheckpointBrief`, sometime uses 
`tasksTowaitFor/tasksToTriggerFor`, which are the internal state of the 
`CheckpointCoordinator`. After we refactor maybe all the methods could only use 
the `CheckpointBrief` as their parameter. (I think CheckpointId,TimeStamps 
should also be in the CheckpointBrief).

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1843,6 +1843,7 @@ int getNumQueuedRequests() {
 
 public void reportStats(long id, ExecutionAttemptID attemptId, 
CheckpointMetrics metrics)
 throws CheckpointException {
+

Review comment:
   maybe we could remove this line





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, ple

[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565321454



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1924,7 +1918,8 @@ private void abortPendingCheckpoint(
 }
 } finally {
 sendAbortedMessages(
-pendingCheckpoint.getCheckpointId(),
+pendingCheckpoint.getRunningTasks(),
+pendingCheckpoint.getCheckpointID(),

Review comment:
Maybe we should use the same method in one call. `getCheckpointID`-> 
`getCheckpointID`  :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565321454



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1924,7 +1918,8 @@ private void abortPendingCheckpoint(
 }
 } finally {
 sendAbortedMessages(
-pendingCheckpoint.getCheckpointId(),
+pendingCheckpoint.getRunningTasks(),
+pendingCheckpoint.getCheckpointID(),

Review comment:
   `getCheckpointId` maybe we should use the same method in one call.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for 
acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+/** Tasks who need to be sent a message when a checkpoint is started. */
+private final List tasksToTrigger;
+
+/** Tasks who need to acknowledge a checkpoint before it succeeds. */
+private final Map tasksToAck;
+
+/**
+ * Tasks that are still running when taking the checkpoint, these need to 
be sent a message when
+ * the checkpoint is confirmed.
+ */
+private final List runningTasks;

Review comment:
   Maybe we should change the `runningTasks` to `tasksToCommitTo`. This pr 
is for refactoring so maybe we should keep the concept consistent first. What 
do you think? 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for 
acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+/** Tasks who need to be sent a message when a checkpoint is started. */
+private final List tasksToTrigger;
+
+/** Tasks who need to acknowledge a checkpoint before it succeeds. */
+private final Map tasksToAck;
+
+/**
+ * Tasks that are still running when taking the checkpoint, these need to 
be sent a message when
+ * the checkpoint is confirmed.
+ */
+private final List runningTasks;

Review comment:
   Maybe we should change the `runningTasks` to `tasksToCommitTo`. This pr 
is for refactoring so we could keep the concept consistent first. What do you 
think?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for 
acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+/** Tasks who need to be sent a message when a checkpoint is started. */
+private final List tasksToTrigger;
+
+/** Tasks who need to acknowledge a checkpoint before it succeeds. */
+private final Map tasksToAck;
+
+/**
+ * Tasks that are still running when taking the checkpoint, these need to 
be sent a message when
+ * the checkpoint is confirmed.
+ */
+private final List runningTasks;

Review comment:
   Maybe we could change the `runningTasks` to `tasksToCommitTo`. This pr 
is for refactoring so we could keep the concept consistent first.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+private final JobID jobId;
+
+private final List tasksToTrigger;
+
+private final List tasksToWait;
+
+private final List tasksToCommit;
+
+public CheckpointBriefComputer(
+JobID jobId,
+List tasksToTrigger,
+List tasksToWait,
+List tasksToCommit) {
+
+this.jobId = jobId;
+this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+}
+
+public CheckpointBrief computeCheckpointBrief() throws CheckpointException 
{
+List tasksToTrigger = getTriggerExecutions();
+Map ackTasks = getAckTasks();
+
+return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+}
+
+private List getTriggerExecutions() throws CheckpointException {

Review comment:
   The two methods(`getTriggerExecutions` & `getActTasks`) is from the 
`CheckpointCoordinator`.So maybe there could be comments just like the 
`getAckTasks` .:)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+private final JobID jobId;
+
+private final List tasksToTrigger;
+
+private final List tasksToWait;
+
+private final List tasksToCommit;
+
+public CheckpointBriefComputer(
+JobID jobId,
+List tasksToTrigger,
+List tasksToWait,
+List tasksToCommit) {
+
+this.jobId = jobId;
+this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+}
+
+public CheckpointBrief computeCheckpointBrief() throws CheckpointException 
{
+List tasksToTrigger = getTriggerExecutions();
+Map ackTasks = getAckTasks();
+
+return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+}
+
+private List getTriggerExecutions() throws CheckpointException {

Review comment:
   The two methods(`getTriggerExecutions` & `getActTasks`) is from the 
`CheckpointCoordinator`.So maybe there could be comments just like the 
`getAckTasks` .  :-) 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+private final JobID jobId;
+
+private final List tasksToTrigger;
+
+private final List tasksToWait;
+
+private final List tasksToCommit;
+
+public CheckpointBriefComputer(
+JobID jobId,
+List tasksToTrigger,
+List tasksToWait,
+List tasksToCommit) {
+
+this.jobId = jobId;
+this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+}
+
+public CheckpointBrief computeCheckpointBrief() throws CheckpointException 
{
+List tasksToTrigger = getTriggerExecutions();
+Map ackTasks = getAckTasks();
+
+return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+}
+
+private List getTriggerExecutions() throws CheckpointException {

Review comment:
   I think there could be comments just like the `getAckTasks` .  :-)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565270255



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
 new DispatcherThreadFactory(
 Thread.currentThread().getThreadGroup(), 
"Checkpoint Timer"));
 
+List sourceVertices = new ArrayList<>();

Review comment:
   Maybe we could move the following "initialize" logical to the 
`CheckpointBriefComputer`, which makes all the related logic together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565270255



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
 new DispatcherThreadFactory(
 Thread.currentThread().getThreadGroup(), 
"Checkpoint Timer"));
 
+List sourceVertices = new ArrayList<>();

Review comment:
   Maybe we could initialize logical to the `CheckpointBriefComputer`, 
which makes all the related logic together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

2021-01-27 Thread GitBox


guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565260718



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
##
@@ -197,22 +197,13 @@ public static ExecutionGraph buildGraph(
 // configure the state checkpointing
 if (isCheckpointingEnabled(jobGraph)) {
 JobCheckpointingSettings snapshotSettings = 
jobGraph.getCheckpointingSettings();
-List triggerVertices =
-idToVertex(snapshotSettings.getVerticesToTrigger(), 
executionGraph);
-
-List ackVertices =
-idToVertex(snapshotSettings.getVerticesToAcknowledge(), 
executionGraph);
-
-List confirmVertices =
-idToVertex(snapshotSettings.getVerticesToConfirm(), 
executionGraph);
 

Review comment:
   `idToVertex` could be removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org