This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e6fb13d8bded83fa5d3c4b19d5b388cd2418010
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Thu Apr 15 12:58:14 2021 +0200

    [hotfix][runtime] Log checkpoint processing delay if above threshold
---
 .../runtime/checkpoint/CheckpointMetaData.java      | 21 +++++++++++++++++++--
 .../org/apache/flink/runtime/taskmanager/Task.java  |  3 ++-
 .../runtime/io/CheckpointBarrierHandler.java        |  5 ++++-
 .../runtime/tasks/SourceOperatorStreamTask.java     |  2 +-
 .../streaming/runtime/tasks/SourceStreamTask.java   |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java     | 13 +++++++++++++
 6 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 25540ba..1d8e15e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -28,12 +28,20 @@ public class CheckpointMetaData implements Serializable {
     /** The ID of the checkpoint. */
     private final long checkpointId;
 
-    /** The timestamp of the checkpoint. */
+    /** The timestamp of the checkpoint triggering. */
     private final long timestamp;
 
+    /** The timestamp of the checkpoint receiving by this subtask. */
+    private final long receiveTimestamp;
+
     public CheckpointMetaData(long checkpointId, long timestamp) {
+        this(checkpointId, timestamp, System.currentTimeMillis());
+    }
+
+    public CheckpointMetaData(long checkpointId, long timestamp, long 
receiveTimestamp) {
         this.checkpointId = checkpointId;
         this.timestamp = timestamp;
+        this.receiveTimestamp = receiveTimestamp;
     }
 
     public long getCheckpointId() {
@@ -44,6 +52,10 @@ public class CheckpointMetaData implements Serializable {
         return timestamp;
     }
 
+    public long getReceiveTimestamp() {
+        return receiveTimestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -55,13 +67,16 @@ public class CheckpointMetaData implements Serializable {
 
         CheckpointMetaData that = (CheckpointMetaData) o;
 
-        return (checkpointId == that.checkpointId) && (timestamp == 
that.timestamp);
+        return (checkpointId == that.checkpointId)
+                && (timestamp == that.timestamp)
+                && (receiveTimestamp == that.receiveTimestamp);
     }
 
     @Override
     public int hashCode() {
         int result = (int) (checkpointId ^ (checkpointId >>> 32));
         result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + (int) (receiveTimestamp ^ (receiveTimestamp >>> 
32));
         return result;
     }
 
@@ -70,6 +85,8 @@ public class CheckpointMetaData implements Serializable {
         return "CheckpointMetaData{"
                 + "checkpointId="
                 + checkpointId
+                + ", receiveTimestamp="
+                + receiveTimestamp
                 + ", timestamp="
                 + timestamp
                 + '}';
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 288dd4a..3001018 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1258,7 +1258,8 @@ public class Task
 
         final AbstractInvokable invokable = this.invokable;
         final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointID, checkpointTimestamp);
+                new CheckpointMetaData(
+                        checkpointID, checkpointTimestamp, 
System.currentTimeMillis());
 
         if (executionState == ExecutionState.RUNNING && invokable != null) {
             try {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 132fd78..72cef441 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -104,7 +104,10 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
 
     protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws IOException {
         CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointBarrier.getId(), 
checkpointBarrier.getTimestamp());
+                new CheckpointMetaData(
+                        checkpointBarrier.getId(),
+                        checkpointBarrier.getTimestamp(),
+                        System.currentTimeMillis());
 
         CheckpointMetricsBuilder checkpointMetrics =
                 new CheckpointMetricsBuilder()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index cd7e25a..aea97e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -144,7 +144,7 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
         final long timestamp = System.currentTimeMillis();
 
         final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp);
+                new CheckpointMetaData(checkpointId, timestamp, timestamp);
 
         super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 8e2c959..711f921 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -107,7 +107,7 @@ public class SourceStreamTask<
                             final long timestamp = System.currentTimeMillis();
 
                             final CheckpointMetaData checkpointMetaData =
-                                    new CheckpointMetaData(checkpointId, 
timestamp);
+                                    new CheckpointMetaData(checkpointId, 
timestamp, timestamp);
 
                             try {
                                 SourceStreamTask.super
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 5f3ffbf..bf59515 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -75,6 +75,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
     private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
+    private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 
30_000;
+
     private final CachingCheckpointStorageWorkerView checkpointStorage;
     private final String taskName;
     private final ExecutorService asyncOperationsThreadPool;
@@ -260,6 +262,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             return;
         }
 
+        logCheckpointProcessingDelay(metadata);
+
         // Step (0): Record the last triggered checkpointId and abort the sync 
phase of checkpoint
         // if necessary.
         lastCheckpointId = metadata.getCheckpointId();
@@ -695,4 +699,13 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             throw ex;
         }
     }
+
+    private static void logCheckpointProcessingDelay(CheckpointMetaData 
checkpointMetaData) {
+        long delay = System.currentTimeMillis() - 
checkpointMetaData.getReceiveTimestamp();
+        if (delay >= CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS) {
+            LOG.warn(
+                    "Time from receiving all checkpoint barriers/RPC to 
executing it exceeded threshold: {}ms",
+                    delay);
+        }
+    }
 }

Reply via email to