This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e0de06e45852b2348c6c80c2a9ed089da645f7cb Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Sep 12 16:24:15 2025 +0200 [FLINK-38353][spans] Report checkpoint sub spans As proposed in https://cwiki.apache.org/confluence/display/FLINK/FLIP-483%3A+Add+support+for+children+Spans --- .../apache/flink/configuration/TraceOptions.java | 68 ++++++ .../checkpoint/DefaultCheckpointStatsTracker.java | 270 ++++++++++++++++++++- .../flink/runtime/scheduler/SchedulerBase.java | 5 +- .../scheduler/adaptive/AdaptiveScheduler.java | 2 + .../apache/flink/runtime/util/LongArrayList.java | 4 + .../DefaultCheckpointStatsTrackerTest.java | 251 +++++++++++++++++++ .../adaptive/AdaptiveSchedulerBuilder.java | 6 +- 7 files changed, 603 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java index 03275c3ce93..fb0fc80cb28 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java @@ -35,6 +35,21 @@ import static org.apache.flink.configuration.description.TextElement.text; @Experimental public class TraceOptions { + /** Enum for the detail level of checkpointing spans. */ + public enum CheckpointSpanDetailLevel { + /** Sum/Max for sub-metrics per checkpoint. */ + SPAN_PER_CHECKPOINT, + /** Sum/Max for sub-metrics per checkpoint and arrays of task aggregates. */ + SPAN_PER_CHECKPOINT_WITH_TASKS, + /** Sub/Max for sub-metrics of checkpoint and tasks (tasks as child spans). */ + CHILDREN_SPANS_PER_TASK, + /** + * Sub/Max for sub-metrics of checkpoint, tasks, and subtasks (tasks as child spans, + * subtasks as grand-child spans). + */ + CHILDREN_SPANS_PER_SUBTASK; + } + private static final String NAMED_REPORTER_CONFIG_PREFIX = ConfigConstants.TRACES_REPORTER_PREFIX + "<name>"; @@ -67,6 +82,59 @@ public class TraceOptions { + " any of the names in the list will be started. Otherwise, all reporters that could be found in" + " the configuration will be started."); + /** The detail level for reporting checkpoint spans. */ + public static final ConfigOption<TraceOptions.CheckpointSpanDetailLevel> + CHECKPOINT_SPAN_DETAIL_LEVEL = + key("traces.checkpoint.span-detail-level") + .enumType(CheckpointSpanDetailLevel.class) + .defaultValue(CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT) + .withDescription( + Description.builder() + .text( + "Detail level for reporting checkpoint spans. Possible values:\n") + .list( + text( + "'%s' (default): Single span per checkpoint. " + + "Aggregated sum/max for sub-metrics from all tasks and subtasks per checkpoint.", + code( + CheckpointSpanDetailLevel + .SPAN_PER_CHECKPOINT + .name())), + text( + "'%s': Single span per checkpoint. " + + "Same as '%s', plus arrays of aggregated values per task.", + code( + CheckpointSpanDetailLevel + .SPAN_PER_CHECKPOINT_WITH_TASKS + .name()), + code( + CheckpointSpanDetailLevel + .SPAN_PER_CHECKPOINT + .name())), + text( + "'%s': Same as '%s' plus children spans per each task. " + + "Each task span with aggregated sum/max sub-metrics from subtasks.", + code( + CheckpointSpanDetailLevel + .CHILDREN_SPANS_PER_TASK + .name()), + code( + CheckpointSpanDetailLevel + .SPAN_PER_CHECKPOINT + .name())), + text( + "'%s': Same as '%s' plus children spans per each subtask. " + + "Child spans for tasks and grand-child spans for subtasks.", + code( + CheckpointSpanDetailLevel + .CHILDREN_SPANS_PER_SUBTASK + .name()), + code( + CheckpointSpanDetailLevel + .CHILDREN_SPANS_PER_TASK + .name()))) + .build()); + /** * Returns a view over the given configuration via which options can be set/retrieved for the * given reporter. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java index 70b05c32ce8..9f891c0181c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.AttributeBuilder; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.events.EventBuilder; import org.apache.flink.events.Events; import org.apache.flink.metrics.Gauge; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.util.LongArrayList; import org.apache.flink.traces.Span; import org.apache.flink.traces.SpanBuilder; @@ -40,6 +42,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -53,6 +59,84 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { private static final Logger LOG = LoggerFactory.getLogger(DefaultCheckpointStatsTracker.class); private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper(); + /** + * Function that extracts a {@link StatsSummary} from a {@link + * org.apache.flink.runtime.checkpoint.TaskStateStats.TaskStateStatsSummary}. + */ + @FunctionalInterface + interface TaskStatsSummaryExtractor { + StatsSummary extract(TaskStateStats.TaskStateStatsSummary taskStateStatsSummary); + } + + /** Function that extracts a (long) metric value from {@link SubtaskStateStats}. */ + @FunctionalInterface + interface SubtaskMetricExtractor { + long extract(SubtaskStateStats subtaskStateStats); + } + + /** + * Helper class that defines a checkpoint span metric and how to extract the required values. + */ + static final class CheckpointSpanMetric { + final String metricName; + final TaskStatsSummaryExtractor taskStatsSummaryExtractor; + final SubtaskMetricExtractor subtaskMetricExtractor; + + private CheckpointSpanMetric( + String metricName, + TaskStatsSummaryExtractor taskStatsSummaryExtractor, + SubtaskMetricExtractor subtaskMetricExtractor) { + this.metricName = metricName; + this.taskStatsSummaryExtractor = taskStatsSummaryExtractor; + this.subtaskMetricExtractor = subtaskMetricExtractor; + } + + static CheckpointSpanMetric of( + String metricName, + TaskStatsSummaryExtractor taskStatsSummaryExtractor, + SubtaskMetricExtractor subtaskMetricExtractor) { + return new CheckpointSpanMetric( + metricName, taskStatsSummaryExtractor, subtaskMetricExtractor); + } + } + + private static final List<CheckpointSpanMetric> CHECKPOINT_SPAN_METRICS = + Arrays.asList( + CheckpointSpanMetric.of( + "StateSizeBytes", + TaskStateStats.TaskStateStatsSummary::getStateSizeStats, + SubtaskStateStats::getStateSize), + CheckpointSpanMetric.of( + "CheckpointedSizeBytes", + TaskStateStats.TaskStateStatsSummary::getCheckpointedSize, + SubtaskStateStats::getCheckpointedSize), + CheckpointSpanMetric.of( + "CheckpointStartDelayMs", + TaskStateStats.TaskStateStatsSummary::getCheckpointStartDelayStats, + SubtaskStateStats::getCheckpointStartDelay), + CheckpointSpanMetric.of( + "AlignmentDurationMs", + TaskStateStats.TaskStateStatsSummary::getAlignmentDurationStats, + SubtaskStateStats::getAlignmentDuration), + CheckpointSpanMetric.of( + "SyncCheckpointDurationMs", + TaskStateStats.TaskStateStatsSummary::getSyncCheckpointDurationStats, + SubtaskStateStats::getSyncCheckpointDuration), + CheckpointSpanMetric.of( + "AsyncCheckpointDurationMs", + TaskStateStats.TaskStateStatsSummary::getAsyncCheckpointDurationStats, + SubtaskStateStats::getAsyncCheckpointDuration), + CheckpointSpanMetric.of( + "ProcessedDataBytes", + TaskStateStats.TaskStateStatsSummary::getProcessedDataStats, + SubtaskStateStats::getProcessedData), + CheckpointSpanMetric.of( + "PersistedDataBytes", + TaskStateStats.TaskStateStatsSummary::getPersistedDataStats, + SubtaskStateStats::getPersistedData)); + + private final TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel; + /** * Lock used to update stats and creating snapshots. Updates always happen from a single Thread * at a time and there can be multiple concurrent read accesses to the latest stats snapshot. @@ -99,7 +183,11 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { */ public DefaultCheckpointStatsTracker( int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup) { - this(numRememberedCheckpoints, metricGroup, null); + this( + numRememberedCheckpoints, + metricGroup, + TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT, + null); } /** @@ -113,10 +201,12 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { public DefaultCheckpointStatsTracker( int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup, + TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel, @Nullable CheckpointStatsListener checkpointStatsListener) { checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints"); this.history = new CheckpointStatsHistory(numRememberedCheckpoints); this.metricGroup = metricGroup; + this.checkpointSpanDetailLevel = checkpointSpanDetailLevel; this.checkpointStatsListener = checkpointStatsListener; // Latest snapshot is empty @@ -264,6 +354,8 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { .setStartTsMillis(checkpointStats.getTriggerTimestamp()) .setEndTsMillis(checkpointStats.getLatestAckTimestamp()); addCommonCheckpointStatsAttributes(spanBuilder, checkpointStats); + // Add max/sum aggregations for breakdown metrics + addCheckpointAggregationStats(checkpointStats, spanBuilder); metricGroup.addSpan(spanBuilder); if (LOG.isDebugEnabled()) { @@ -300,6 +392,131 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { return attributeBuilder; } + private void addCheckpointAggregationStats( + AbstractCheckpointStats checkpointStats, SpanBuilder checkpointSpanBuilder) { + + final List<TaskStateStats> sortedTaskStateStats = + new ArrayList<>(checkpointStats.getAllTaskStateStats()); + sortedTaskStateStats.sort( + (x, y) -> + Long.signum( + x.getSummaryStats().getCheckpointStartDelayStats().getMinimum() + - y.getSummaryStats() + .getCheckpointStartDelayStats() + .getMinimum())); + + CHECKPOINT_SPAN_METRICS.stream() + .map(metric -> TaskStatsAggregator.aggregate(sortedTaskStateStats, metric)) + .forEach( + aggregator -> { + final String metricName = aggregator.getMetricName(); + checkpointSpanBuilder.setAttribute( + "max" + metricName, aggregator.getTotalMax()); + + if (!shouldSkipSumMetricNameInCheckpointSpanForCompatibility( + metricName)) { + checkpointSpanBuilder.setAttribute( + "sum" + metricName, aggregator.getTotalSum()); + } + + if (checkpointSpanDetailLevel + == TraceOptions.CheckpointSpanDetailLevel + .SPAN_PER_CHECKPOINT_WITH_TASKS) { + checkpointSpanBuilder.setAttribute( + "perTaskMax" + metricName, + Arrays.toString( + aggregator.getValuesMax().getInternalArray())); + checkpointSpanBuilder.setAttribute( + "perTaskSum" + metricName, + Arrays.toString( + aggregator.getValuesSum().getInternalArray())); + } + }); + + if (checkpointSpanDetailLevel + == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK + || checkpointSpanDetailLevel + == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) { + for (TaskStateStats taskStats : sortedTaskStateStats) { + checkpointSpanBuilder.addChild( + createTaskSpan( + checkpointStats, + taskStats, + checkpointSpanDetailLevel + == TraceOptions.CheckpointSpanDetailLevel + .CHILDREN_SPANS_PER_SUBTASK)); + } + } + } + + private SpanBuilder createTaskSpan( + AbstractCheckpointStats checkpointStats, + TaskStateStats taskStats, + boolean addSubtaskSpans) { + + // start = trigger ts + minimum delay. + long taskStartTs = + checkpointStats.getTriggerTimestamp() + + taskStats.getSummaryStats().getCheckpointStartDelayStats().getMinimum(); + SpanBuilder taskSpanBuilder = + Span.builder(CheckpointStatsTracker.class, "Checkpoint_Task") + .setStartTsMillis(taskStartTs) + .setEndTsMillis(taskStats.getLatestAckTimestamp()) + .setAttribute("checkpointId", checkpointStats.getCheckpointId()) + .setAttribute("jobVertexId", taskStats.getJobVertexId().toString()); + + for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) { + String metricName = spanMetric.metricName; + StatsSummary statsSummary = + spanMetric.taskStatsSummaryExtractor.extract(taskStats.getSummaryStats()); + taskSpanBuilder.setAttribute("max" + metricName, statsSummary.getMaximum()); + taskSpanBuilder.setAttribute("sum" + metricName, statsSummary.getSum()); + } + + if (addSubtaskSpans) { + addSubtaskSpans(checkpointStats, taskStats, taskSpanBuilder); + } + + return taskSpanBuilder; + } + + private void addSubtaskSpans( + AbstractCheckpointStats checkpointStats, + TaskStateStats taskStats, + SpanBuilder taskSpanBuilder) { + for (SubtaskStateStats subtaskStat : taskStats.getSubtaskStats()) { + if (subtaskStat == null) { + continue; + } + + // start = trigger ts + minimum delay. + long subTaskStartTs = + checkpointStats.getTriggerTimestamp() + subtaskStat.getCheckpointStartDelay(); + + SpanBuilder subTaskSpanBuilder = + Span.builder(CheckpointStatsTracker.class, "Checkpoint_Subtask") + .setStartTsMillis(subTaskStartTs) + .setEndTsMillis(subtaskStat.getAckTimestamp()) + .setAttribute("checkpointId", checkpointStats.getCheckpointId()) + .setAttribute("jobVertexId", taskStats.getJobVertexId().toString()) + .setAttribute("subtaskId", subtaskStat.getSubtaskIndex()); + + for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) { + String metricName = spanMetric.metricName; + long metricValue = spanMetric.subtaskMetricExtractor.extract(subtaskStat); + subTaskSpanBuilder.setAttribute(metricName, metricValue); + } + + taskSpanBuilder.addChild(subTaskSpanBuilder); + } + } + + private boolean shouldSkipSumMetricNameInCheckpointSpanForCompatibility(String metricName) { + // Those two metrics already exists under different names that we want to preserve + // (fullSize, checkpointedSize). + return metricName.equals("StateSizeBytes") || metricName.equals("CheckpointedSizeBytes"); + } + @Override public void reportFailedCheckpointsWithoutInProgress() { statsReadWriteLock.lock(); @@ -649,4 +866,55 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker { } } } + + static class TaskStatsAggregator { + final String metricName; + final LongArrayList valuesMax; + final LongArrayList valuesSum; + + TaskStatsAggregator(String metric, LongArrayList valuesMax, LongArrayList valuesSum) { + this.metricName = metric; + this.valuesMax = valuesMax; + this.valuesSum = valuesSum; + } + + public static TaskStatsAggregator aggregate( + Collection<TaskStateStats> allTaskStateStats, + CheckpointSpanMetric metricDescriptor) { + + final LongArrayList valuesMax = new LongArrayList(allTaskStateStats.size()); + final LongArrayList valuesSum = new LongArrayList(allTaskStateStats.size()); + for (TaskStateStats taskStats : allTaskStateStats) { + StatsSummary statsSummary = + metricDescriptor.taskStatsSummaryExtractor.extract( + taskStats.getSummaryStats()); + valuesMax.add(statsSummary.getMaximum()); + valuesSum.add(statsSummary.getSum()); + } + return new TaskStatsAggregator(metricDescriptor.metricName, valuesMax, valuesSum); + } + + public LongArrayList getValuesMax() { + return valuesMax; + } + + public LongArrayList getValuesSum() { + return valuesSum; + } + + public String getMetricName() { + return metricName; + } + + public long getTotalMax() { + return Arrays.stream(valuesMax.getInternalArray()) + .filter(val -> val > 0L) + .max() + .orElse(0L); + } + + public long getTotalSum() { + return Arrays.stream(valuesSum.getInternalArray()).filter(val -> val >= 0L).sum(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 1ec53f6efd9..b7e03f8058c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -135,6 +135,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL; import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -247,7 +248,9 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling new DefaultCheckpointStatsTracker( jobMasterConfiguration.get( WebOptions.CHECKPOINTS_HISTORY_SIZE), - jobManagerJobMetricGroup)); + jobManagerJobMetricGroup, + jobMasterConfiguration.get(CHECKPOINT_SPAN_DETAIL_LEVEL), + null)); this.executionGraph = createAndRestoreExecutionGraph( completedCheckpointStore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 158b462d692..d8219d59542 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -158,6 +158,7 @@ import java.util.function.Function; import java.util.function.Supplier; import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_DELAY; +import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL; import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking; /** @@ -455,6 +456,7 @@ public class AdaptiveScheduler new DefaultCheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), metricGroup, + configuration.get(CHECKPOINT_SPAN_DETAIL_LEVEL), checkpointStatsListener), jobGraph, jobResourceRequirements, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java index 2cde638096d..9784c45a6a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java @@ -67,6 +67,10 @@ public class LongArrayList { return Arrays.copyOf(array, size); } + public long[] getInternalArray() { + return array; + } + private void grow(int length) { if (length > array.length) { final int newLength = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java index 4a08a733f3e..b3bbe4b3d9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.events.Event; import org.apache.flink.events.EventBuilder; @@ -32,6 +33,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.traces.Span; import org.apache.flink.traces.SpanBuilder; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables; @@ -287,6 +289,7 @@ class DefaultCheckpointStatsTrackerTest { new DefaultCheckpointStatsTracker( 10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT, listener); // "factory" code to enable the instantiation of test data based on a PendingCheckpointStats @@ -434,6 +437,254 @@ class DefaultCheckpointStatsTrackerTest { assertThat(attributes.get("isUnaligned")).isEqualTo(Boolean.toString(isUnaligned)); } + @Test + public void testSpanCreationBreakDownPerCheckpoint() { + testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT); + } + + @Test + public void testSpanCreationBreakDownPerCheckpointWithTasks() { + testSpanCreationTemplate( + TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS); + } + + @Test + public void testSpanCreationBreakDownPerTask() { + testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK); + } + + @Test + public void testSpanCreationBreakDownPerSubTask() { + testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK); + } + + public void testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel detailLevel) { + JobVertexID jobVertexID0 = new JobVertexID(); + JobVertexID jobVertexID1 = new JobVertexID(); + + final List<Span> reportedSpans = new ArrayList<>(); + final List<Event> reportedEvents = new ArrayList<>(); + + produceTestSpans(jobVertexID0, jobVertexID1, detailLevel, reportedSpans, reportedEvents); + assertThat(reportedSpans).hasSize(1); + assertThat(reportedEvents).hasSize(1); + + Map<String, Object> expected = new HashMap<>(); + expected.put("checkpointId", 42L); + expected.put("checkpointedSize", 37L); + expected.put("fullSize", 40L); + expected.put("checkpointStatus", "COMPLETED"); + expected.put("checkpointType", "Checkpoint"); + expected.put("isUnaligned", "true"); + expected.put("metadataSize", 1984L); + + assertThat(reportedEvents.get(0).getAttributes()) + .containsExactlyInAnyOrderEntriesOf(expected); + + expected.put("maxCheckpointStartDelayMs", 29L); + expected.put("maxPersistedDataBytes", 27L); + expected.put("maxAsyncCheckpointDurationMs", 25L); + expected.put("maxSyncCheckpointDurationMs", 24L); + expected.put("maxAlignmentDurationMs", 28L); + expected.put("maxProcessedDataBytes", 26L); + expected.put("sumCheckpointStartDelayMs", 58L); + expected.put("sumAlignmentDurationMs", 55L); + expected.put("sumProcessedDataBytes", 49L); + expected.put("sumAsyncCheckpointDurationMs", 46L); + expected.put("sumPersistedDataBytes", 52L); + expected.put("sumSyncCheckpointDurationMs", 43L); + expected.put("maxStateSizeBytes", 23L); + expected.put("maxCheckpointedSizeBytes", 22L); + + if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS) { + expected.put("perTaskMaxAlignmentDurationMs", "[28, 8]"); + expected.put("perTaskMaxAsyncCheckpointDurationMs", "[16, 25]"); + expected.put("perTaskMaxCheckpointStartDelayMs", "[20, 29]"); + expected.put("perTaskMaxPersistedDataBytes", "[18, 27]"); + expected.put("perTaskMaxProcessedDataBytes", "[17, 26]"); + expected.put("perTaskMaxSyncCheckpointDurationMs", "[24, 4]"); + expected.put("perTaskMaxStateSizeBytes", "[14, 23]"); + expected.put("perTaskMaxCheckpointedSizeBytes", "[13, 22]"); + expected.put("perTaskSumAlignmentDurationMs", "[47, 8]"); + expected.put("perTaskSumAsyncCheckpointDurationMs", "[21, 25]"); + expected.put("perTaskSumCheckpointStartDelayMs", "[29, 29]"); + expected.put("perTaskSumPersistedDataBytes", "[25, 27]"); + expected.put("perTaskSumProcessedDataBytes", "[23, 26]"); + expected.put("perTaskSumSyncCheckpointDurationMs", "[39, 4]"); + expected.put("perTaskSumStateSizeBytes", "[17, 23]"); + expected.put("perTaskSumCheckpointedSizeBytes", "[15, 22]"); + } + + Span checkpointLevelSpan = reportedSpans.get(0); + assertThat(checkpointLevelSpan.getAttributes()) + .containsExactlyInAnyOrderEntriesOf(expected); + + List<Span> taskLevelSpans = checkpointLevelSpan.getChildren(); + if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK + || detailLevel + == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) { + assertThat(taskLevelSpans.size()).isEqualTo(2); + } else { + assertThat(taskLevelSpans).isEmpty(); + return; + } + + Span taskSpan0 = taskLevelSpans.get(0); + expected.clear(); + + expected.put("checkpointId", 42L); + expected.put("jobVertexId", jobVertexID0.toString()); + expected.put("maxCheckpointStartDelayMs", 20L); + expected.put("maxPersistedDataBytes", 18L); + expected.put("maxAsyncCheckpointDurationMs", 16L); + expected.put("maxSyncCheckpointDurationMs", 24L); + expected.put("maxAlignmentDurationMs", 28L); + expected.put("maxProcessedDataBytes", 17L); + expected.put("sumCheckpointStartDelayMs", 29L); + expected.put("sumAlignmentDurationMs", 47L); + expected.put("sumProcessedDataBytes", 23L); + expected.put("sumAsyncCheckpointDurationMs", 21L); + expected.put("sumPersistedDataBytes", 25L); + expected.put("sumSyncCheckpointDurationMs", 39L); + expected.put("maxStateSizeBytes", 14L); + expected.put("sumStateSizeBytes", 17L); + expected.put("maxCheckpointedSizeBytes", 13L); + expected.put("sumCheckpointedSizeBytes", 15L); + + assertThat(taskSpan0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected); + + Span taskSpan1 = taskLevelSpans.get(1); + expected.clear(); + + expected.put("checkpointId", 42L); + expected.put("jobVertexId", jobVertexID1.toString()); + expected.put("maxCheckpointStartDelayMs", 29L); + expected.put("maxPersistedDataBytes", 27L); + expected.put("maxAsyncCheckpointDurationMs", 25L); + expected.put("maxSyncCheckpointDurationMs", 4L); + expected.put("maxAlignmentDurationMs", 8L); + expected.put("maxProcessedDataBytes", 26L); + expected.put("sumCheckpointStartDelayMs", 29L); + expected.put("sumAlignmentDurationMs", 8L); + expected.put("sumProcessedDataBytes", 26L); + expected.put("sumAsyncCheckpointDurationMs", 25L); + expected.put("sumPersistedDataBytes", 27L); + expected.put("sumSyncCheckpointDurationMs", 4L); + expected.put("maxStateSizeBytes", 23L); + expected.put("sumStateSizeBytes", 23L); + expected.put("maxCheckpointedSizeBytes", 22L); + expected.put("sumCheckpointedSizeBytes", 22L); + assertThat(taskSpan1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected); + + List<Span> subtasksSpans0 = taskSpan0.getChildren(); + List<Span> subtasksSpans1 = taskSpan1.getChildren(); + + if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) { + assertThat(subtasksSpans0.size()).isEqualTo(2); + assertThat(subtasksSpans1.size()).isEqualTo(1); + } else { + assertThat(subtasksSpans0).isEmpty(); + assertThat(subtasksSpans1).isEmpty(); + return; + } + + Span subtaskSpan0N0 = subtasksSpans0.get(0); + expected.clear(); + + expected.put("checkpointId", 42L); + expected.put("jobVertexId", jobVertexID0.toString()); + expected.put("subtaskId", 0L); + expected.put("CheckpointStartDelayMs", 9L); + expected.put("AlignmentDurationMs", 28L); + expected.put("ProcessedDataBytes", 6L); + expected.put("AsyncCheckpointDurationMs", 5L); + expected.put("PersistedDataBytes", 7L); + expected.put("SyncCheckpointDurationMs", 24L); + expected.put("StateSizeBytes", 3L); + expected.put("CheckpointedSizeBytes", 2L); + assertThat(subtaskSpan0N0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected); + + Span subtaskSpan0N1 = subtasksSpans0.get(1); + expected.clear(); + + expected.put("checkpointId", 42L); + expected.put("jobVertexId", jobVertexID0.toString()); + expected.put("subtaskId", 1L); + expected.put("CheckpointStartDelayMs", 20L); + expected.put("AlignmentDurationMs", 19L); + expected.put("ProcessedDataBytes", 17L); + expected.put("AsyncCheckpointDurationMs", 16L); + expected.put("PersistedDataBytes", 18L); + expected.put("SyncCheckpointDurationMs", 15L); + expected.put("StateSizeBytes", 14L); + expected.put("CheckpointedSizeBytes", 13L); + assertThat(subtaskSpan0N1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected); + + Span subtaskSpan1N10 = subtasksSpans1.get(0); + expected.clear(); + + expected.put("checkpointId", 42L); + expected.put("jobVertexId", jobVertexID1.toString()); + expected.put("subtaskId", 0L); + expected.put("CheckpointStartDelayMs", 29L); + expected.put("AlignmentDurationMs", 8L); + expected.put("ProcessedDataBytes", 26L); + expected.put("AsyncCheckpointDurationMs", 25L); + expected.put("PersistedDataBytes", 27L); + expected.put("SyncCheckpointDurationMs", 4L); + expected.put("StateSizeBytes", 23L); + expected.put("CheckpointedSizeBytes", 22L); + assertThat(subtaskSpan1N10.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected); + } + + private List<Span> produceTestSpans( + JobVertexID jobVertexID0, + JobVertexID jobVertexID1, + TraceOptions.CheckpointSpanDetailLevel detailLevel, + List<Span> reportedSpansOut, + List<Event> reportedEventsOut) { + + JobManagerJobMetricGroup metricGroup = + new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { + + @Override + public void addSpan(SpanBuilder spanBuilder) { + reportedSpansOut.add(spanBuilder.build()); + } + + @Override + public void addEvent(EventBuilder eventBuilder) { + reportedEventsOut.add(eventBuilder.build()); + } + }; + + CheckpointStatsTracker tracker = + new DefaultCheckpointStatsTracker(10, metricGroup, detailLevel, null); + + Map<JobVertexID, Integer> subtasksByVertex = CollectionUtil.newHashMapWithExpectedSize(2); + subtasksByVertex.put(jobVertexID0, 2); + subtasksByVertex.put(jobVertexID1, 1); + PendingCheckpointStats pending = + tracker.reportPendingCheckpoint( + 42, + 1, + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + subtasksByVertex); + + pending.reportSubtaskStats( + jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true)); + pending.reportSubtaskStats( + jobVertexID0, + new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true)); + pending.reportSubtaskStats( + jobVertexID1, + new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true)); + // Complete checkpoint => new snapshot + tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984)); + return reportedSpansOut; + } + @Test public void testInitializationSpanCreation() throws Exception { final List<Span> reportedSpans = new ArrayList<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index b6626e97ace..405ef99baac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; @@ -106,7 +107,10 @@ public class AdaptiveSchedulerBuilder { checkpointStatsTrackerFactory = (metricGroup, checkpointStatsListener) -> new DefaultCheckpointStatsTracker( - 10, metricGroup, checkpointStatsListener); + 10, + metricGroup, + TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT, + checkpointStatsListener); public AdaptiveSchedulerBuilder( final JobGraph jobGraph,
