This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0de06e45852b2348c6c80c2a9ed089da645f7cb
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Sep 12 16:24:15 2025 +0200

    [FLINK-38353][spans] Report checkpoint sub spans
    
    As proposed in
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-483%3A+Add+support+for+children+Spans
---
 .../apache/flink/configuration/TraceOptions.java   |  68 ++++++
 .../checkpoint/DefaultCheckpointStatsTracker.java  | 270 ++++++++++++++++++++-
 .../flink/runtime/scheduler/SchedulerBase.java     |   5 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   2 +
 .../apache/flink/runtime/util/LongArrayList.java   |   4 +
 .../DefaultCheckpointStatsTrackerTest.java         | 251 +++++++++++++++++++
 .../adaptive/AdaptiveSchedulerBuilder.java         |   6 +-
 7 files changed, 603 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
index 03275c3ce93..fb0fc80cb28 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -35,6 +35,21 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 @Experimental
 public class TraceOptions {
 
+    /** Enum for the detail level of checkpointing spans. */
+    public enum CheckpointSpanDetailLevel {
+        /** Sum/Max for sub-metrics per checkpoint. */
+        SPAN_PER_CHECKPOINT,
+        /** Sum/Max for sub-metrics per checkpoint and arrays of task 
aggregates. */
+        SPAN_PER_CHECKPOINT_WITH_TASKS,
+        /** Sub/Max for sub-metrics of checkpoint and tasks (tasks as child 
spans). */
+        CHILDREN_SPANS_PER_TASK,
+        /**
+         * Sub/Max for sub-metrics of checkpoint, tasks, and subtasks (tasks 
as child spans,
+         * subtasks as grand-child spans).
+         */
+        CHILDREN_SPANS_PER_SUBTASK;
+    }
+
     private static final String NAMED_REPORTER_CONFIG_PREFIX =
             ConfigConstants.TRACES_REPORTER_PREFIX + "<name>";
 
@@ -67,6 +82,59 @@ public class TraceOptions {
                                     + " any of the names in the list will be 
started. Otherwise, all reporters that could be found in"
                                     + " the configuration will be started.");
 
+    /** The detail level for reporting checkpoint spans. */
+    public static final ConfigOption<TraceOptions.CheckpointSpanDetailLevel>
+            CHECKPOINT_SPAN_DETAIL_LEVEL =
+                    key("traces.checkpoint.span-detail-level")
+                            .enumType(CheckpointSpanDetailLevel.class)
+                            
.defaultValue(CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "Detail level for 
reporting checkpoint spans. Possible values:\n")
+                                            .list(
+                                                    text(
+                                                            "'%s' (default): 
Single span per checkpoint. "
+                                                                    + 
"Aggregated sum/max for sub-metrics from all tasks and subtasks per 
checkpoint.",
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.SPAN_PER_CHECKPOINT
+                                                                            
.name())),
+                                                    text(
+                                                            "'%s': Single span 
per checkpoint. "
+                                                                    + "Same as 
'%s', plus arrays of aggregated values per task.",
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.SPAN_PER_CHECKPOINT_WITH_TASKS
+                                                                            
.name()),
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.SPAN_PER_CHECKPOINT
+                                                                            
.name())),
+                                                    text(
+                                                            "'%s': Same as 
'%s' plus children spans per each task. "
+                                                                    + "Each 
task span with aggregated sum/max sub-metrics from subtasks.",
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.CHILDREN_SPANS_PER_TASK
+                                                                            
.name()),
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.SPAN_PER_CHECKPOINT
+                                                                            
.name())),
+                                                    text(
+                                                            "'%s': Same as 
'%s' plus children spans per each subtask. "
+                                                                    + "Child 
spans for tasks and grand-child spans for subtasks.",
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.CHILDREN_SPANS_PER_SUBTASK
+                                                                            
.name()),
+                                                            code(
+                                                                    
CheckpointSpanDetailLevel
+                                                                            
.CHILDREN_SPANS_PER_TASK
+                                                                            
.name())))
+                                            .build());
+
     /**
      * Returns a view over the given configuration via which options can be 
set/retrieved for the
      * given reporter.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
index 70b05c32ce8..9f891c0181c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.AttributeBuilder;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.events.EventBuilder;
 import org.apache.flink.events.Events;
 import org.apache.flink.metrics.Gauge;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.util.LongArrayList;
 import org.apache.flink.traces.Span;
 import org.apache.flink.traces.SpanBuilder;
 
@@ -40,6 +42,10 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -53,6 +59,84 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultCheckpointStatsTracker.class);
     private static final ObjectMapper MAPPER = 
RestMapperUtils.getStrictObjectMapper();
 
+    /**
+     * Function that extracts a {@link StatsSummary} from a {@link
+     * 
org.apache.flink.runtime.checkpoint.TaskStateStats.TaskStateStatsSummary}.
+     */
+    @FunctionalInterface
+    interface TaskStatsSummaryExtractor {
+        StatsSummary extract(TaskStateStats.TaskStateStatsSummary 
taskStateStatsSummary);
+    }
+
+    /** Function that extracts a (long) metric value from {@link 
SubtaskStateStats}. */
+    @FunctionalInterface
+    interface SubtaskMetricExtractor {
+        long extract(SubtaskStateStats subtaskStateStats);
+    }
+
+    /**
+     * Helper class that defines a checkpoint span metric and how to extract 
the required values.
+     */
+    static final class CheckpointSpanMetric {
+        final String metricName;
+        final TaskStatsSummaryExtractor taskStatsSummaryExtractor;
+        final SubtaskMetricExtractor subtaskMetricExtractor;
+
+        private CheckpointSpanMetric(
+                String metricName,
+                TaskStatsSummaryExtractor taskStatsSummaryExtractor,
+                SubtaskMetricExtractor subtaskMetricExtractor) {
+            this.metricName = metricName;
+            this.taskStatsSummaryExtractor = taskStatsSummaryExtractor;
+            this.subtaskMetricExtractor = subtaskMetricExtractor;
+        }
+
+        static CheckpointSpanMetric of(
+                String metricName,
+                TaskStatsSummaryExtractor taskStatsSummaryExtractor,
+                SubtaskMetricExtractor subtaskMetricExtractor) {
+            return new CheckpointSpanMetric(
+                    metricName, taskStatsSummaryExtractor, 
subtaskMetricExtractor);
+        }
+    }
+
+    private static final List<CheckpointSpanMetric> CHECKPOINT_SPAN_METRICS =
+            Arrays.asList(
+                    CheckpointSpanMetric.of(
+                            "StateSizeBytes",
+                            
TaskStateStats.TaskStateStatsSummary::getStateSizeStats,
+                            SubtaskStateStats::getStateSize),
+                    CheckpointSpanMetric.of(
+                            "CheckpointedSizeBytes",
+                            
TaskStateStats.TaskStateStatsSummary::getCheckpointedSize,
+                            SubtaskStateStats::getCheckpointedSize),
+                    CheckpointSpanMetric.of(
+                            "CheckpointStartDelayMs",
+                            
TaskStateStats.TaskStateStatsSummary::getCheckpointStartDelayStats,
+                            SubtaskStateStats::getCheckpointStartDelay),
+                    CheckpointSpanMetric.of(
+                            "AlignmentDurationMs",
+                            
TaskStateStats.TaskStateStatsSummary::getAlignmentDurationStats,
+                            SubtaskStateStats::getAlignmentDuration),
+                    CheckpointSpanMetric.of(
+                            "SyncCheckpointDurationMs",
+                            
TaskStateStats.TaskStateStatsSummary::getSyncCheckpointDurationStats,
+                            SubtaskStateStats::getSyncCheckpointDuration),
+                    CheckpointSpanMetric.of(
+                            "AsyncCheckpointDurationMs",
+                            
TaskStateStats.TaskStateStatsSummary::getAsyncCheckpointDurationStats,
+                            SubtaskStateStats::getAsyncCheckpointDuration),
+                    CheckpointSpanMetric.of(
+                            "ProcessedDataBytes",
+                            
TaskStateStats.TaskStateStatsSummary::getProcessedDataStats,
+                            SubtaskStateStats::getProcessedData),
+                    CheckpointSpanMetric.of(
+                            "PersistedDataBytes",
+                            
TaskStateStats.TaskStateStatsSummary::getPersistedDataStats,
+                            SubtaskStateStats::getPersistedData));
+
+    private final TraceOptions.CheckpointSpanDetailLevel 
checkpointSpanDetailLevel;
+
     /**
      * Lock used to update stats and creating snapshots. Updates always happen 
from a single Thread
      * at a time and there can be multiple concurrent read accesses to the 
latest stats snapshot.
@@ -99,7 +183,11 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
      */
     public DefaultCheckpointStatsTracker(
             int numRememberedCheckpoints, JobManagerJobMetricGroup 
metricGroup) {
-        this(numRememberedCheckpoints, metricGroup, null);
+        this(
+                numRememberedCheckpoints,
+                metricGroup,
+                TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
+                null);
     }
 
     /**
@@ -113,10 +201,12 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
     public DefaultCheckpointStatsTracker(
             int numRememberedCheckpoints,
             JobManagerJobMetricGroup metricGroup,
+            TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel,
             @Nullable CheckpointStatsListener checkpointStatsListener) {
         checkArgument(numRememberedCheckpoints >= 0, "Negative number of 
remembered checkpoints");
         this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
         this.metricGroup = metricGroup;
+        this.checkpointSpanDetailLevel = checkpointSpanDetailLevel;
         this.checkpointStatsListener = checkpointStatsListener;
 
         // Latest snapshot is empty
@@ -264,6 +354,8 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
                             
.setStartTsMillis(checkpointStats.getTriggerTimestamp())
                             
.setEndTsMillis(checkpointStats.getLatestAckTimestamp());
             addCommonCheckpointStatsAttributes(spanBuilder, checkpointStats);
+            // Add max/sum aggregations for breakdown metrics
+            addCheckpointAggregationStats(checkpointStats, spanBuilder);
             metricGroup.addSpan(spanBuilder);
 
             if (LOG.isDebugEnabled()) {
@@ -300,6 +392,131 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
         return attributeBuilder;
     }
 
+    private void addCheckpointAggregationStats(
+            AbstractCheckpointStats checkpointStats, SpanBuilder 
checkpointSpanBuilder) {
+
+        final List<TaskStateStats> sortedTaskStateStats =
+                new ArrayList<>(checkpointStats.getAllTaskStateStats());
+        sortedTaskStateStats.sort(
+                (x, y) ->
+                        Long.signum(
+                                
x.getSummaryStats().getCheckpointStartDelayStats().getMinimum()
+                                        - y.getSummaryStats()
+                                                .getCheckpointStartDelayStats()
+                                                .getMinimum()));
+
+        CHECKPOINT_SPAN_METRICS.stream()
+                .map(metric -> 
TaskStatsAggregator.aggregate(sortedTaskStateStats, metric))
+                .forEach(
+                        aggregator -> {
+                            final String metricName = 
aggregator.getMetricName();
+                            checkpointSpanBuilder.setAttribute(
+                                    "max" + metricName, 
aggregator.getTotalMax());
+
+                            if 
(!shouldSkipSumMetricNameInCheckpointSpanForCompatibility(
+                                    metricName)) {
+                                checkpointSpanBuilder.setAttribute(
+                                        "sum" + metricName, 
aggregator.getTotalSum());
+                            }
+
+                            if (checkpointSpanDetailLevel
+                                    == TraceOptions.CheckpointSpanDetailLevel
+                                            .SPAN_PER_CHECKPOINT_WITH_TASKS) {
+                                checkpointSpanBuilder.setAttribute(
+                                        "perTaskMax" + metricName,
+                                        Arrays.toString(
+                                                
aggregator.getValuesMax().getInternalArray()));
+                                checkpointSpanBuilder.setAttribute(
+                                        "perTaskSum" + metricName,
+                                        Arrays.toString(
+                                                
aggregator.getValuesSum().getInternalArray()));
+                            }
+                        });
+
+        if (checkpointSpanDetailLevel
+                        == 
TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK
+                || checkpointSpanDetailLevel
+                        == 
TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+            for (TaskStateStats taskStats : sortedTaskStateStats) {
+                checkpointSpanBuilder.addChild(
+                        createTaskSpan(
+                                checkpointStats,
+                                taskStats,
+                                checkpointSpanDetailLevel
+                                        == 
TraceOptions.CheckpointSpanDetailLevel
+                                                .CHILDREN_SPANS_PER_SUBTASK));
+            }
+        }
+    }
+
+    private SpanBuilder createTaskSpan(
+            AbstractCheckpointStats checkpointStats,
+            TaskStateStats taskStats,
+            boolean addSubtaskSpans) {
+
+        // start = trigger ts + minimum delay.
+        long taskStartTs =
+                checkpointStats.getTriggerTimestamp()
+                        + 
taskStats.getSummaryStats().getCheckpointStartDelayStats().getMinimum();
+        SpanBuilder taskSpanBuilder =
+                Span.builder(CheckpointStatsTracker.class, "Checkpoint_Task")
+                        .setStartTsMillis(taskStartTs)
+                        .setEndTsMillis(taskStats.getLatestAckTimestamp())
+                        .setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
+                        .setAttribute("jobVertexId", 
taskStats.getJobVertexId().toString());
+
+        for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
+            String metricName = spanMetric.metricName;
+            StatsSummary statsSummary =
+                    
spanMetric.taskStatsSummaryExtractor.extract(taskStats.getSummaryStats());
+            taskSpanBuilder.setAttribute("max" + metricName, 
statsSummary.getMaximum());
+            taskSpanBuilder.setAttribute("sum" + metricName, 
statsSummary.getSum());
+        }
+
+        if (addSubtaskSpans) {
+            addSubtaskSpans(checkpointStats, taskStats, taskSpanBuilder);
+        }
+
+        return taskSpanBuilder;
+    }
+
+    private void addSubtaskSpans(
+            AbstractCheckpointStats checkpointStats,
+            TaskStateStats taskStats,
+            SpanBuilder taskSpanBuilder) {
+        for (SubtaskStateStats subtaskStat : taskStats.getSubtaskStats()) {
+            if (subtaskStat == null) {
+                continue;
+            }
+
+            // start = trigger ts + minimum delay.
+            long subTaskStartTs =
+                    checkpointStats.getTriggerTimestamp() + 
subtaskStat.getCheckpointStartDelay();
+
+            SpanBuilder subTaskSpanBuilder =
+                    Span.builder(CheckpointStatsTracker.class, 
"Checkpoint_Subtask")
+                            .setStartTsMillis(subTaskStartTs)
+                            .setEndTsMillis(subtaskStat.getAckTimestamp())
+                            .setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
+                            .setAttribute("jobVertexId", 
taskStats.getJobVertexId().toString())
+                            .setAttribute("subtaskId", 
subtaskStat.getSubtaskIndex());
+
+            for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
+                String metricName = spanMetric.metricName;
+                long metricValue = 
spanMetric.subtaskMetricExtractor.extract(subtaskStat);
+                subTaskSpanBuilder.setAttribute(metricName, metricValue);
+            }
+
+            taskSpanBuilder.addChild(subTaskSpanBuilder);
+        }
+    }
+
+    private boolean 
shouldSkipSumMetricNameInCheckpointSpanForCompatibility(String metricName) {
+        // Those two metrics already exists under different names that we want 
to preserve
+        // (fullSize, checkpointedSize).
+        return metricName.equals("StateSizeBytes") || 
metricName.equals("CheckpointedSizeBytes");
+    }
+
     @Override
     public void reportFailedCheckpointsWithoutInProgress() {
         statsReadWriteLock.lock();
@@ -649,4 +866,55 @@ public class DefaultCheckpointStatsTracker implements 
CheckpointStatsTracker {
             }
         }
     }
+
+    static class TaskStatsAggregator {
+        final String metricName;
+        final LongArrayList valuesMax;
+        final LongArrayList valuesSum;
+
+        TaskStatsAggregator(String metric, LongArrayList valuesMax, 
LongArrayList valuesSum) {
+            this.metricName = metric;
+            this.valuesMax = valuesMax;
+            this.valuesSum = valuesSum;
+        }
+
+        public static TaskStatsAggregator aggregate(
+                Collection<TaskStateStats> allTaskStateStats,
+                CheckpointSpanMetric metricDescriptor) {
+
+            final LongArrayList valuesMax = new 
LongArrayList(allTaskStateStats.size());
+            final LongArrayList valuesSum = new 
LongArrayList(allTaskStateStats.size());
+            for (TaskStateStats taskStats : allTaskStateStats) {
+                StatsSummary statsSummary =
+                        metricDescriptor.taskStatsSummaryExtractor.extract(
+                                taskStats.getSummaryStats());
+                valuesMax.add(statsSummary.getMaximum());
+                valuesSum.add(statsSummary.getSum());
+            }
+            return new TaskStatsAggregator(metricDescriptor.metricName, 
valuesMax, valuesSum);
+        }
+
+        public LongArrayList getValuesMax() {
+            return valuesMax;
+        }
+
+        public LongArrayList getValuesSum() {
+            return valuesSum;
+        }
+
+        public String getMetricName() {
+            return metricName;
+        }
+
+        public long getTotalMax() {
+            return Arrays.stream(valuesMax.getInternalArray())
+                    .filter(val -> val > 0L)
+                    .max()
+                    .orElse(0L);
+        }
+
+        public long getTotalSum() {
+            return Arrays.stream(valuesSum.getInternalArray()).filter(val -> 
val >= 0L).sum();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1ec53f6efd9..b7e03f8058c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -135,6 +135,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -247,7 +248,9 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
                                 new DefaultCheckpointStatsTracker(
                                         jobMasterConfiguration.get(
                                                 
WebOptions.CHECKPOINTS_HISTORY_SIZE),
-                                        jobManagerJobMetricGroup));
+                                        jobManagerJobMetricGroup,
+                                        
jobMasterConfiguration.get(CHECKPOINT_SPAN_DETAIL_LEVEL),
+                                        null));
         this.executionGraph =
                 createAndRestoreExecutionGraph(
                         completedCheckpointStore,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 158b462d692..d8219d59542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -158,6 +158,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.configuration.JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_DELAY;
+import static 
org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
 
 /**
@@ -455,6 +456,7 @@ public class AdaptiveScheduler
                         new DefaultCheckpointStatsTracker(
                                 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
                                 metricGroup,
+                                
configuration.get(CHECKPOINT_SPAN_DETAIL_LEVEL),
                                 checkpointStatsListener),
                 jobGraph,
                 jobResourceRequirements,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
index 2cde638096d..9784c45a6a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
@@ -67,6 +67,10 @@ public class LongArrayList {
         return Arrays.copyOf(array, size);
     }
 
+    public long[] getInternalArray() {
+        return array;
+    }
+
     private void grow(int length) {
         if (length > array.length) {
             final int newLength =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
index 4a08a733f3e..b3bbe4b3d9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.events.Event;
 import org.apache.flink.events.EventBuilder;
@@ -32,6 +33,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.traces.Span;
 import org.apache.flink.traces.SpanBuilder;
+import org.apache.flink.util.CollectionUtil;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
 
@@ -287,6 +289,7 @@ class DefaultCheckpointStatsTrackerTest {
                 new DefaultCheckpointStatsTracker(
                         10,
                         
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+                        
TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
                         listener);
 
         // "factory" code to enable the instantiation of test data based on a 
PendingCheckpointStats
@@ -434,6 +437,254 @@ class DefaultCheckpointStatsTrackerTest {
         
assertThat(attributes.get("isUnaligned")).isEqualTo(Boolean.toString(isUnaligned));
     }
 
+    @Test
+    public void testSpanCreationBreakDownPerCheckpoint() {
+        
testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT);
+    }
+
+    @Test
+    public void testSpanCreationBreakDownPerCheckpointWithTasks() {
+        testSpanCreationTemplate(
+                
TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS);
+    }
+
+    @Test
+    public void testSpanCreationBreakDownPerTask() {
+        
testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK);
+    }
+
+    @Test
+    public void testSpanCreationBreakDownPerSubTask() {
+        
testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK);
+    }
+
+    public void 
testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel detailLevel) {
+        JobVertexID jobVertexID0 = new JobVertexID();
+        JobVertexID jobVertexID1 = new JobVertexID();
+
+        final List<Span> reportedSpans = new ArrayList<>();
+        final List<Event> reportedEvents = new ArrayList<>();
+
+        produceTestSpans(jobVertexID0, jobVertexID1, detailLevel, 
reportedSpans, reportedEvents);
+        assertThat(reportedSpans).hasSize(1);
+        assertThat(reportedEvents).hasSize(1);
+
+        Map<String, Object> expected = new HashMap<>();
+        expected.put("checkpointId", 42L);
+        expected.put("checkpointedSize", 37L);
+        expected.put("fullSize", 40L);
+        expected.put("checkpointStatus", "COMPLETED");
+        expected.put("checkpointType", "Checkpoint");
+        expected.put("isUnaligned", "true");
+        expected.put("metadataSize", 1984L);
+
+        assertThat(reportedEvents.get(0).getAttributes())
+                .containsExactlyInAnyOrderEntriesOf(expected);
+
+        expected.put("maxCheckpointStartDelayMs", 29L);
+        expected.put("maxPersistedDataBytes", 27L);
+        expected.put("maxAsyncCheckpointDurationMs", 25L);
+        expected.put("maxSyncCheckpointDurationMs", 24L);
+        expected.put("maxAlignmentDurationMs", 28L);
+        expected.put("maxProcessedDataBytes", 26L);
+        expected.put("sumCheckpointStartDelayMs", 58L);
+        expected.put("sumAlignmentDurationMs", 55L);
+        expected.put("sumProcessedDataBytes", 49L);
+        expected.put("sumAsyncCheckpointDurationMs", 46L);
+        expected.put("sumPersistedDataBytes", 52L);
+        expected.put("sumSyncCheckpointDurationMs", 43L);
+        expected.put("maxStateSizeBytes", 23L);
+        expected.put("maxCheckpointedSizeBytes", 22L);
+
+        if (detailLevel == 
TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS) {
+            expected.put("perTaskMaxAlignmentDurationMs", "[28, 8]");
+            expected.put("perTaskMaxAsyncCheckpointDurationMs", "[16, 25]");
+            expected.put("perTaskMaxCheckpointStartDelayMs", "[20, 29]");
+            expected.put("perTaskMaxPersistedDataBytes", "[18, 27]");
+            expected.put("perTaskMaxProcessedDataBytes", "[17, 26]");
+            expected.put("perTaskMaxSyncCheckpointDurationMs", "[24, 4]");
+            expected.put("perTaskMaxStateSizeBytes", "[14, 23]");
+            expected.put("perTaskMaxCheckpointedSizeBytes", "[13, 22]");
+            expected.put("perTaskSumAlignmentDurationMs", "[47, 8]");
+            expected.put("perTaskSumAsyncCheckpointDurationMs", "[21, 25]");
+            expected.put("perTaskSumCheckpointStartDelayMs", "[29, 29]");
+            expected.put("perTaskSumPersistedDataBytes", "[25, 27]");
+            expected.put("perTaskSumProcessedDataBytes", "[23, 26]");
+            expected.put("perTaskSumSyncCheckpointDurationMs", "[39, 4]");
+            expected.put("perTaskSumStateSizeBytes", "[17, 23]");
+            expected.put("perTaskSumCheckpointedSizeBytes", "[15, 22]");
+        }
+
+        Span checkpointLevelSpan = reportedSpans.get(0);
+        assertThat(checkpointLevelSpan.getAttributes())
+                .containsExactlyInAnyOrderEntriesOf(expected);
+
+        List<Span> taskLevelSpans = checkpointLevelSpan.getChildren();
+        if (detailLevel == 
TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK
+                || detailLevel
+                        == 
TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+            assertThat(taskLevelSpans.size()).isEqualTo(2);
+        } else {
+            assertThat(taskLevelSpans).isEmpty();
+            return;
+        }
+
+        Span taskSpan0 = taskLevelSpans.get(0);
+        expected.clear();
+
+        expected.put("checkpointId", 42L);
+        expected.put("jobVertexId", jobVertexID0.toString());
+        expected.put("maxCheckpointStartDelayMs", 20L);
+        expected.put("maxPersistedDataBytes", 18L);
+        expected.put("maxAsyncCheckpointDurationMs", 16L);
+        expected.put("maxSyncCheckpointDurationMs", 24L);
+        expected.put("maxAlignmentDurationMs", 28L);
+        expected.put("maxProcessedDataBytes", 17L);
+        expected.put("sumCheckpointStartDelayMs", 29L);
+        expected.put("sumAlignmentDurationMs", 47L);
+        expected.put("sumProcessedDataBytes", 23L);
+        expected.put("sumAsyncCheckpointDurationMs", 21L);
+        expected.put("sumPersistedDataBytes", 25L);
+        expected.put("sumSyncCheckpointDurationMs", 39L);
+        expected.put("maxStateSizeBytes", 14L);
+        expected.put("sumStateSizeBytes", 17L);
+        expected.put("maxCheckpointedSizeBytes", 13L);
+        expected.put("sumCheckpointedSizeBytes", 15L);
+
+        
assertThat(taskSpan0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+        Span taskSpan1 = taskLevelSpans.get(1);
+        expected.clear();
+
+        expected.put("checkpointId", 42L);
+        expected.put("jobVertexId", jobVertexID1.toString());
+        expected.put("maxCheckpointStartDelayMs", 29L);
+        expected.put("maxPersistedDataBytes", 27L);
+        expected.put("maxAsyncCheckpointDurationMs", 25L);
+        expected.put("maxSyncCheckpointDurationMs", 4L);
+        expected.put("maxAlignmentDurationMs", 8L);
+        expected.put("maxProcessedDataBytes", 26L);
+        expected.put("sumCheckpointStartDelayMs", 29L);
+        expected.put("sumAlignmentDurationMs", 8L);
+        expected.put("sumProcessedDataBytes", 26L);
+        expected.put("sumAsyncCheckpointDurationMs", 25L);
+        expected.put("sumPersistedDataBytes", 27L);
+        expected.put("sumSyncCheckpointDurationMs", 4L);
+        expected.put("maxStateSizeBytes", 23L);
+        expected.put("sumStateSizeBytes", 23L);
+        expected.put("maxCheckpointedSizeBytes", 22L);
+        expected.put("sumCheckpointedSizeBytes", 22L);
+        
assertThat(taskSpan1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+        List<Span> subtasksSpans0 = taskSpan0.getChildren();
+        List<Span> subtasksSpans1 = taskSpan1.getChildren();
+
+        if (detailLevel == 
TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+            assertThat(subtasksSpans0.size()).isEqualTo(2);
+            assertThat(subtasksSpans1.size()).isEqualTo(1);
+        } else {
+            assertThat(subtasksSpans0).isEmpty();
+            assertThat(subtasksSpans1).isEmpty();
+            return;
+        }
+
+        Span subtaskSpan0N0 = subtasksSpans0.get(0);
+        expected.clear();
+
+        expected.put("checkpointId", 42L);
+        expected.put("jobVertexId", jobVertexID0.toString());
+        expected.put("subtaskId", 0L);
+        expected.put("CheckpointStartDelayMs", 9L);
+        expected.put("AlignmentDurationMs", 28L);
+        expected.put("ProcessedDataBytes", 6L);
+        expected.put("AsyncCheckpointDurationMs", 5L);
+        expected.put("PersistedDataBytes", 7L);
+        expected.put("SyncCheckpointDurationMs", 24L);
+        expected.put("StateSizeBytes", 3L);
+        expected.put("CheckpointedSizeBytes", 2L);
+        
assertThat(subtaskSpan0N0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+        Span subtaskSpan0N1 = subtasksSpans0.get(1);
+        expected.clear();
+
+        expected.put("checkpointId", 42L);
+        expected.put("jobVertexId", jobVertexID0.toString());
+        expected.put("subtaskId", 1L);
+        expected.put("CheckpointStartDelayMs", 20L);
+        expected.put("AlignmentDurationMs", 19L);
+        expected.put("ProcessedDataBytes", 17L);
+        expected.put("AsyncCheckpointDurationMs", 16L);
+        expected.put("PersistedDataBytes", 18L);
+        expected.put("SyncCheckpointDurationMs", 15L);
+        expected.put("StateSizeBytes", 14L);
+        expected.put("CheckpointedSizeBytes", 13L);
+        
assertThat(subtaskSpan0N1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+        Span subtaskSpan1N10 = subtasksSpans1.get(0);
+        expected.clear();
+
+        expected.put("checkpointId", 42L);
+        expected.put("jobVertexId", jobVertexID1.toString());
+        expected.put("subtaskId", 0L);
+        expected.put("CheckpointStartDelayMs", 29L);
+        expected.put("AlignmentDurationMs", 8L);
+        expected.put("ProcessedDataBytes", 26L);
+        expected.put("AsyncCheckpointDurationMs", 25L);
+        expected.put("PersistedDataBytes", 27L);
+        expected.put("SyncCheckpointDurationMs", 4L);
+        expected.put("StateSizeBytes", 23L);
+        expected.put("CheckpointedSizeBytes", 22L);
+        
assertThat(subtaskSpan1N10.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+    }
+
+    private List<Span> produceTestSpans(
+            JobVertexID jobVertexID0,
+            JobVertexID jobVertexID1,
+            TraceOptions.CheckpointSpanDetailLevel detailLevel,
+            List<Span> reportedSpansOut,
+            List<Event> reportedEventsOut) {
+
+        JobManagerJobMetricGroup metricGroup =
+                new 
UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
+
+                    @Override
+                    public void addSpan(SpanBuilder spanBuilder) {
+                        reportedSpansOut.add(spanBuilder.build());
+                    }
+
+                    @Override
+                    public void addEvent(EventBuilder eventBuilder) {
+                        reportedEventsOut.add(eventBuilder.build());
+                    }
+                };
+
+        CheckpointStatsTracker tracker =
+                new DefaultCheckpointStatsTracker(10, metricGroup, 
detailLevel, null);
+
+        Map<JobVertexID, Integer> subtasksByVertex = 
CollectionUtil.newHashMapWithExpectedSize(2);
+        subtasksByVertex.put(jobVertexID0, 2);
+        subtasksByVertex.put(jobVertexID1, 1);
+        PendingCheckpointStats pending =
+                tracker.reportPendingCheckpoint(
+                        42,
+                        1,
+                        CheckpointProperties.forCheckpoint(
+                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                        subtasksByVertex);
+
+        pending.reportSubtaskStats(
+                jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 
28, 9, false, true));
+        pending.reportSubtaskStats(
+                jobVertexID0,
+                new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
false, true));
+        pending.reportSubtaskStats(
+                jobVertexID1,
+                new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, 
true, true));
+        // Complete checkpoint => new snapshot
+        
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 
1984));
+        return reportedSpansOut;
+    }
+
     @Test
     public void testInitializationSpanCreation() throws Exception {
         final List<Span> reportedSpans = new ArrayList<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index b6626e97ace..405ef99baac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
@@ -106,7 +107,10 @@ public class AdaptiveSchedulerBuilder {
             checkpointStatsTrackerFactory =
                     (metricGroup, checkpointStatsListener) ->
                             new DefaultCheckpointStatsTracker(
-                                    10, metricGroup, checkpointStatsListener);
+                                    10,
+                                    metricGroup,
+                                    
TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
+                                    checkpointStatsListener);
 
     public AdaptiveSchedulerBuilder(
             final JobGraph jobGraph,


Reply via email to