This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7e6fb13d8bded83fa5d3c4b19d5b388cd2418010 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Thu Apr 15 12:58:14 2021 +0200 [hotfix][runtime] Log checkpoint processing delay if above threshold --- .../runtime/checkpoint/CheckpointMetaData.java | 21 +++++++++++++++++++-- .../org/apache/flink/runtime/taskmanager/Task.java | 3 ++- .../runtime/io/CheckpointBarrierHandler.java | 5 ++++- .../runtime/tasks/SourceOperatorStreamTask.java | 2 +- .../streaming/runtime/tasks/SourceStreamTask.java | 2 +- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 13 +++++++++++++ 6 files changed, 40 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java index 25540ba..1d8e15e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java @@ -28,12 +28,20 @@ public class CheckpointMetaData implements Serializable { /** The ID of the checkpoint. */ private final long checkpointId; - /** The timestamp of the checkpoint. */ + /** The timestamp of the checkpoint triggering. */ private final long timestamp; + /** The timestamp of the checkpoint receiving by this subtask. */ + private final long receiveTimestamp; + public CheckpointMetaData(long checkpointId, long timestamp) { + this(checkpointId, timestamp, System.currentTimeMillis()); + } + + public CheckpointMetaData(long checkpointId, long timestamp, long receiveTimestamp) { this.checkpointId = checkpointId; this.timestamp = timestamp; + this.receiveTimestamp = receiveTimestamp; } public long getCheckpointId() { @@ -44,6 +52,10 @@ public class CheckpointMetaData implements Serializable { return timestamp; } + public long getReceiveTimestamp() { + return receiveTimestamp; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -55,13 +67,16 @@ public class CheckpointMetaData implements Serializable { CheckpointMetaData that = (CheckpointMetaData) o; - return (checkpointId == that.checkpointId) && (timestamp == that.timestamp); + return (checkpointId == that.checkpointId) + && (timestamp == that.timestamp) + && (receiveTimestamp == that.receiveTimestamp); } @Override public int hashCode() { int result = (int) (checkpointId ^ (checkpointId >>> 32)); result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + (int) (receiveTimestamp ^ (receiveTimestamp >>> 32)); return result; } @@ -70,6 +85,8 @@ public class CheckpointMetaData implements Serializable { return "CheckpointMetaData{" + "checkpointId=" + checkpointId + + ", receiveTimestamp=" + + receiveTimestamp + ", timestamp=" + timestamp + '}'; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 288dd4a..3001018 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -1258,7 +1258,8 @@ public class Task final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointID, checkpointTimestamp); + new CheckpointMetaData( + checkpointID, checkpointTimestamp, System.currentTimeMillis()); if (executionState == ExecutionState.RUNNING && invokable != null) { try { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 132fd78..72cef441 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -104,7 +104,10 @@ public abstract class CheckpointBarrierHandler implements Closeable { protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException { CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); + new CheckpointMetaData( + checkpointBarrier.getId(), + checkpointBarrier.getTimestamp(), + System.currentTimeMillis()); CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index cd7e25a..aea97e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -144,7 +144,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointId, timestamp); + new CheckpointMetaData(checkpointId, timestamp, timestamp); super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 8e2c959..711f921 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -107,7 +107,7 @@ public class SourceStreamTask< final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointId, timestamp); + new CheckpointMetaData(checkpointId, timestamp, timestamp); try { SourceStreamTask.super diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 5f3ffbf..bf59515 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -75,6 +75,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class); private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128; + private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000; + private final CachingCheckpointStorageWorkerView checkpointStorage; private final String taskName; private final ExecutorService asyncOperationsThreadPool; @@ -260,6 +262,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { return; } + logCheckpointProcessingDelay(metadata); + // Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint // if necessary. lastCheckpointId = metadata.getCheckpointId(); @@ -695,4 +699,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { throw ex; } } + + private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) { + long delay = System.currentTimeMillis() - checkpointMetaData.getReceiveTimestamp(); + if (delay >= CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS) { + LOG.warn( + "Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: {}ms", + delay); + } + } }