This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new cf96442c295 [FLINK-38703][runtime] Update slot manager metrics in 
thread-safety manner
cf96442c295 is described below

commit cf96442c295baf9317f7efdd8dd73f6e79abc297
Author: Zdenek Tison <[email protected]>
AuthorDate: Thu Nov 20 15:59:42 2025 +0100

    [FLINK-38703][runtime] Update slot manager metrics in thread-safety manner
---
 .../slotmanager/FineGrainedSlotManager.java        |  41 ++++++++-
 .../slotmanager/FineGrainedSlotManagerTest.java    | 100 +++++++++++++++++++++
 .../FineGrainedSlotManagerTestBase.java            |   6 +-
 .../runtime/metrics/JobManagerMetricsITCase.java   |   4 +-
 4 files changed, 145 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 132b1cef665..5e739d7cd0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.StringJoiner;
@@ -69,6 +70,8 @@ import java.util.stream.Stream;
 
 /** Implementation of {@link SlotManager} supporting fine-grained resource 
management. */
 public class FineGrainedSlotManager implements SlotManager {
+    public static final Duration METRICS_UPDATE_INTERVAL = 
Duration.ofSeconds(1);
+
     private static final Logger LOG = 
LoggerFactory.getLogger(FineGrainedSlotManager.class);
 
     private final TaskManagerTracker taskManagerTracker;
@@ -119,6 +122,8 @@ public class FineGrainedSlotManager implements SlotManager {
 
     @Nullable private ScheduledFuture<?> clusterReconciliationCheck;
 
+    @Nullable private ScheduledFuture<?> metricsUpdateFuture;
+
     @Nullable private CompletableFuture<Void> requirementsCheckFuture;
 
     @Nullable private CompletableFuture<Void> declareNeededResourceFuture;
@@ -129,6 +134,11 @@ public class FineGrainedSlotManager implements SlotManager 
{
     /** True iff the component has been started. */
     private boolean started;
 
+    /** Metrics. */
+    private long lastNumberFreeSlots;
+
+    private long lastNumberRegisteredSlots;
+
     public FineGrainedSlotManager(
             ScheduledExecutor scheduledExecutor,
             SlotManagerConfiguration slotManagerConfiguration,
@@ -166,6 +176,7 @@ public class FineGrainedSlotManager implements SlotManager {
         mainThreadExecutor = null;
         clusterReconciliationCheck = null;
         requirementsCheckFuture = null;
+        metricsUpdateFuture = null;
 
         started = false;
     }
@@ -234,10 +245,26 @@ public class FineGrainedSlotManager implements 
SlotManager {
     }
 
     private void registerSlotManagerMetrics() {
-        slotManagerMetricGroup.gauge(
-                MetricNames.TASK_SLOTS_AVAILABLE, () -> (long) 
getNumberFreeSlots());
-        slotManagerMetricGroup.gauge(
-                MetricNames.TASK_SLOTS_TOTAL, () -> (long) 
getNumberRegisteredSlots());
+        // Because taskManagerTracker is not thread-safe, metrics must be 
updated periodically on
+        // the main thread to prevent concurrent modification issues.
+        metricsUpdateFuture =
+                scheduledExecutor.scheduleAtFixedRate(
+                        this::updateMetrics,
+                        0L,
+                        METRICS_UPDATE_INTERVAL.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () -> 
lastNumberFreeSlots);
+        slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () -> 
lastNumberRegisteredSlots);
+    }
+
+    private void updateMetrics() {
+        Objects.requireNonNull(mainThreadExecutor)
+                .execute(
+                        () -> {
+                            lastNumberFreeSlots = getNumberFreeSlots();
+                            lastNumberRegisteredSlots = 
getNumberRegisteredSlots();
+                        });
     }
 
     /** Suspends the component. This clears the internal state of the slot 
manager. */
@@ -257,6 +284,12 @@ public class FineGrainedSlotManager implements SlotManager 
{
             clusterReconciliationCheck = null;
         }
 
+        // stop the metrics updates
+        if (metricsUpdateFuture != null) {
+            metricsUpdateFuture.cancel(false);
+            metricsUpdateFuture = null;
+        }
+
         slotStatusSyncer.close();
         taskManagerTracker.clear();
         resourceTracker.clear();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index a19f42177b1..5f374f2f9d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -20,12 +20,14 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -38,6 +40,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.jupiter.api.Test;
@@ -50,6 +53,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -1058,4 +1062,100 @@ class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
             }
         };
     }
+
+    @Test
+    void testMetricsUpdate() throws Exception {
+        final AtomicReference<Gauge<Long>> slotsAvailableGauge = new 
AtomicReference<>();
+        final AtomicReference<Gauge<Long>> slotsTotalGauge = new 
AtomicReference<>();
+
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    if 
(name.equals(MetricNames.TASK_SLOTS_AVAILABLE)) {
+                                        slotsAvailableGauge.set((Gauge<Long>) 
metric);
+                                    } else if 
(name.equals(MetricNames.TASK_SLOTS_TOTAL)) {
+                                        slotsTotalGauge.set((Gauge<Long>) 
metric);
+                                    }
+                                })
+                        .build();
+
+        final Context context = new Context();
+        context.setSlotManagerMetricGroup(
+                SlotManagerMetricGroup.create(metricRegistry, "localhost"));
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        context.setScheduledExecutor(scheduledExecutor);
+        final TaskExecutorConnection executorConnection1 = 
createTaskExecutorConnection();
+        final TaskExecutorConnection executorConnection2 = 
createTaskExecutorConnection();
+
+        context.runTest(
+                () -> {
+                    
assertThat(slotsAvailableGauge.get().getValue()).isEqualTo(0);
+                    assertThat(slotsTotalGauge.get().getValue()).isEqualTo(0);
+
+                    final CompletableFuture<SlotManager.RegistrationResult>
+                            registerTaskManagerFuture1 = new 
CompletableFuture<>();
+                    context.runInMainThreadAndWait(
+                            () ->
+                                    registerTaskManagerFuture1.complete(
+                                            context.getSlotManager()
+                                                    .registerTaskManager(
+                                                            
executorConnection1,
+                                                            new SlotReport(),
+                                                            
DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                            
DEFAULT_SLOT_RESOURCE_PROFILE)));
+                    
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture1))
+                            .isEqualTo(SlotManager.RegistrationResult.SUCCESS);
+
+                    final CompletableFuture<SlotManager.RegistrationResult>
+                            registerTaskManagerFuture2 = new 
CompletableFuture<>();
+                    context.runInMainThreadAndWait(
+                            () ->
+                                    registerTaskManagerFuture2.complete(
+                                            context.getSlotManager()
+                                                    .registerTaskManager(
+                                                            
executorConnection2,
+                                                            new SlotReport(
+                                                                    
createAllocatedSlotStatus(
+                                                                            
new JobID(),
+                                                                            
new AllocationID(),
+                                                                            
DEFAULT_SLOT_RESOURCE_PROFILE)),
+                                                            
DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                            
DEFAULT_SLOT_RESOURCE_PROFILE)));
+                    
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture2))
+                            .isEqualTo(SlotManager.RegistrationResult.SUCCESS);
+
+                    // triggers the metric update task on the main thread and 
waits for the main
+                    // thread to process queued up callbacks
+                    scheduledExecutor.triggerPeriodicScheduledTasks();
+                    context.runInMainThreadAndWait(() -> {});
+
+                    assertThat(slotsTotalGauge.get().getValue())
+                            .isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER);
+                    assertThat(slotsAvailableGauge.get().getValue())
+                            .isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER - 1);
+
+                    final CompletableFuture<Boolean> 
unRegisterTaskManagerFuture =
+                            new CompletableFuture<>();
+                    context.runInMainThreadAndWait(
+                            () ->
+                                    unRegisterTaskManagerFuture.complete(
+                                            context.getSlotManager()
+                                                    .unregisterTaskManager(
+                                                            
executorConnection2.getInstanceID(),
+                                                            TEST_EXCEPTION)));
+                    
assertThat(assertFutureCompleteAndReturn(unRegisterTaskManagerFuture)).isTrue();
+
+                    // triggers the metric update task on the main thread and 
waits for the main
+                    // thread to process queued up callbacks
+                    scheduledExecutor.triggerPeriodicScheduledTasks();
+                    context.runInMainThreadAndWait(() -> {});
+
+                    assertThat(slotsTotalGauge.get().getValue())
+                            .isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
+                    assertThat(slotsAvailableGauge.get().getValue())
+                            .isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
+                });
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 00e85b14839..23c6c89cf04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -152,7 +152,7 @@ abstract class FineGrainedSlotManagerTestBase {
         private SlotManagerMetricGroup slotManagerMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
         private BlockedTaskManagerChecker blockedTaskManagerChecker = 
resourceID -> false;
-        private final ScheduledExecutor scheduledExecutor =
+        private ScheduledExecutor scheduledExecutor =
                 new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
         private final Executor mainThreadExecutor = 
EXECUTOR_RESOURCE.getExecutor();
         private FineGrainedSlotManager slotManager;
@@ -193,6 +193,10 @@ abstract class FineGrainedSlotManagerTestBase {
             this.blockedTaskManagerChecker = blockedTaskManagerChecker;
         }
 
+        public void setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
+            this.scheduledExecutor = scheduledExecutor;
+        }
+
         void runInMainThread(Runnable runnable) {
             mainThreadExecutor.execute(runnable);
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index 3216f3181fa..a2dcb672a6a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -131,7 +132,8 @@ class JobManagerMetricsITCase {
                                 expectedPattern, gaugeNames));
             }
         }
-
+        // wait for metrics update
+        
Thread.sleep(FineGrainedSlotManager.METRICS_UPDATE_INTERVAL.toMillis());
         for (Map.Entry<Gauge<?>, String> entry : 
reporter.getGauges().entrySet()) {
             if (entry.getValue().contains(MetricNames.TASK_SLOTS_AVAILABLE)) {
                 assertEquals(0L, entry.getKey().getValue());

Reply via email to