This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new cf96442c295 [FLINK-38703][runtime] Update slot manager metrics in
thread-safety manner
cf96442c295 is described below
commit cf96442c295baf9317f7efdd8dd73f6e79abc297
Author: Zdenek Tison <[email protected]>
AuthorDate: Thu Nov 20 15:59:42 2025 +0100
[FLINK-38703][runtime] Update slot manager metrics in thread-safety manner
---
.../slotmanager/FineGrainedSlotManager.java | 41 ++++++++-
.../slotmanager/FineGrainedSlotManagerTest.java | 100 +++++++++++++++++++++
.../FineGrainedSlotManagerTestBase.java | 6 +-
.../runtime/metrics/JobManagerMetricsITCase.java | 4 +-
4 files changed, 145 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 132b1cef665..5e739d7cd0c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
@@ -69,6 +70,8 @@ import java.util.stream.Stream;
/** Implementation of {@link SlotManager} supporting fine-grained resource
management. */
public class FineGrainedSlotManager implements SlotManager {
+ public static final Duration METRICS_UPDATE_INTERVAL =
Duration.ofSeconds(1);
+
private static final Logger LOG =
LoggerFactory.getLogger(FineGrainedSlotManager.class);
private final TaskManagerTracker taskManagerTracker;
@@ -119,6 +122,8 @@ public class FineGrainedSlotManager implements SlotManager {
@Nullable private ScheduledFuture<?> clusterReconciliationCheck;
+ @Nullable private ScheduledFuture<?> metricsUpdateFuture;
+
@Nullable private CompletableFuture<Void> requirementsCheckFuture;
@Nullable private CompletableFuture<Void> declareNeededResourceFuture;
@@ -129,6 +134,11 @@ public class FineGrainedSlotManager implements SlotManager
{
/** True iff the component has been started. */
private boolean started;
+ /** Metrics. */
+ private long lastNumberFreeSlots;
+
+ private long lastNumberRegisteredSlots;
+
public FineGrainedSlotManager(
ScheduledExecutor scheduledExecutor,
SlotManagerConfiguration slotManagerConfiguration,
@@ -166,6 +176,7 @@ public class FineGrainedSlotManager implements SlotManager {
mainThreadExecutor = null;
clusterReconciliationCheck = null;
requirementsCheckFuture = null;
+ metricsUpdateFuture = null;
started = false;
}
@@ -234,10 +245,26 @@ public class FineGrainedSlotManager implements
SlotManager {
}
private void registerSlotManagerMetrics() {
- slotManagerMetricGroup.gauge(
- MetricNames.TASK_SLOTS_AVAILABLE, () -> (long)
getNumberFreeSlots());
- slotManagerMetricGroup.gauge(
- MetricNames.TASK_SLOTS_TOTAL, () -> (long)
getNumberRegisteredSlots());
+ // Because taskManagerTracker is not thread-safe, metrics must be
updated periodically on
+ // the main thread to prevent concurrent modification issues.
+ metricsUpdateFuture =
+ scheduledExecutor.scheduleAtFixedRate(
+ this::updateMetrics,
+ 0L,
+ METRICS_UPDATE_INTERVAL.toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () ->
lastNumberFreeSlots);
+ slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () ->
lastNumberRegisteredSlots);
+ }
+
+ private void updateMetrics() {
+ Objects.requireNonNull(mainThreadExecutor)
+ .execute(
+ () -> {
+ lastNumberFreeSlots = getNumberFreeSlots();
+ lastNumberRegisteredSlots =
getNumberRegisteredSlots();
+ });
}
/** Suspends the component. This clears the internal state of the slot
manager. */
@@ -257,6 +284,12 @@ public class FineGrainedSlotManager implements SlotManager
{
clusterReconciliationCheck = null;
}
+ // stop the metrics updates
+ if (metricsUpdateFuture != null) {
+ metricsUpdateFuture.cancel(false);
+ metricsUpdateFuture = null;
+ }
+
slotStatusSyncer.close();
taskManagerTracker.clear();
resourceTracker.clear();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index a19f42177b1..5f374f2f9d1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -20,12 +20,14 @@ package
org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -38,6 +40,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.jupiter.api.Test;
@@ -50,6 +53,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -1058,4 +1062,100 @@ class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
}
};
}
+
+ @Test
+ void testMetricsUpdate() throws Exception {
+ final AtomicReference<Gauge<Long>> slotsAvailableGauge = new
AtomicReference<>();
+ final AtomicReference<Gauge<Long>> slotsTotalGauge = new
AtomicReference<>();
+
+ final MetricRegistry metricRegistry =
+ TestingMetricRegistry.builder()
+ .setRegisterConsumer(
+ (metric, name, group) -> {
+ if
(name.equals(MetricNames.TASK_SLOTS_AVAILABLE)) {
+ slotsAvailableGauge.set((Gauge<Long>)
metric);
+ } else if
(name.equals(MetricNames.TASK_SLOTS_TOTAL)) {
+ slotsTotalGauge.set((Gauge<Long>)
metric);
+ }
+ })
+ .build();
+
+ final Context context = new Context();
+ context.setSlotManagerMetricGroup(
+ SlotManagerMetricGroup.create(metricRegistry, "localhost"));
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+ context.setScheduledExecutor(scheduledExecutor);
+ final TaskExecutorConnection executorConnection1 =
createTaskExecutorConnection();
+ final TaskExecutorConnection executorConnection2 =
createTaskExecutorConnection();
+
+ context.runTest(
+ () -> {
+
assertThat(slotsAvailableGauge.get().getValue()).isEqualTo(0);
+ assertThat(slotsTotalGauge.get().getValue()).isEqualTo(0);
+
+ final CompletableFuture<SlotManager.RegistrationResult>
+ registerTaskManagerFuture1 = new
CompletableFuture<>();
+ context.runInMainThreadAndWait(
+ () ->
+ registerTaskManagerFuture1.complete(
+ context.getSlotManager()
+ .registerTaskManager(
+
executorConnection1,
+ new SlotReport(),
+
DEFAULT_TOTAL_RESOURCE_PROFILE,
+
DEFAULT_SLOT_RESOURCE_PROFILE)));
+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture1))
+ .isEqualTo(SlotManager.RegistrationResult.SUCCESS);
+
+ final CompletableFuture<SlotManager.RegistrationResult>
+ registerTaskManagerFuture2 = new
CompletableFuture<>();
+ context.runInMainThreadAndWait(
+ () ->
+ registerTaskManagerFuture2.complete(
+ context.getSlotManager()
+ .registerTaskManager(
+
executorConnection2,
+ new SlotReport(
+
createAllocatedSlotStatus(
+
new JobID(),
+
new AllocationID(),
+
DEFAULT_SLOT_RESOURCE_PROFILE)),
+
DEFAULT_TOTAL_RESOURCE_PROFILE,
+
DEFAULT_SLOT_RESOURCE_PROFILE)));
+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture2))
+ .isEqualTo(SlotManager.RegistrationResult.SUCCESS);
+
+ // triggers the metric update task on the main thread and
waits for the main
+ // thread to process queued up callbacks
+ scheduledExecutor.triggerPeriodicScheduledTasks();
+ context.runInMainThreadAndWait(() -> {});
+
+ assertThat(slotsTotalGauge.get().getValue())
+ .isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER);
+ assertThat(slotsAvailableGauge.get().getValue())
+ .isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER - 1);
+
+ final CompletableFuture<Boolean>
unRegisterTaskManagerFuture =
+ new CompletableFuture<>();
+ context.runInMainThreadAndWait(
+ () ->
+ unRegisterTaskManagerFuture.complete(
+ context.getSlotManager()
+ .unregisterTaskManager(
+
executorConnection2.getInstanceID(),
+ TEST_EXCEPTION)));
+
assertThat(assertFutureCompleteAndReturn(unRegisterTaskManagerFuture)).isTrue();
+
+ // triggers the metric update task on the main thread and
waits for the main
+ // thread to process queued up callbacks
+ scheduledExecutor.triggerPeriodicScheduledTasks();
+ context.runInMainThreadAndWait(() -> {});
+
+ assertThat(slotsTotalGauge.get().getValue())
+ .isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
+ assertThat(slotsAvailableGauge.get().getValue())
+ .isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
+ });
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 00e85b14839..23c6c89cf04 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -152,7 +152,7 @@ abstract class FineGrainedSlotManagerTestBase {
private SlotManagerMetricGroup slotManagerMetricGroup =
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
private BlockedTaskManagerChecker blockedTaskManagerChecker =
resourceID -> false;
- private final ScheduledExecutor scheduledExecutor =
+ private ScheduledExecutor scheduledExecutor =
new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
private final Executor mainThreadExecutor =
EXECUTOR_RESOURCE.getExecutor();
private FineGrainedSlotManager slotManager;
@@ -193,6 +193,10 @@ abstract class FineGrainedSlotManagerTestBase {
this.blockedTaskManagerChecker = blockedTaskManagerChecker;
}
+ public void setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
void runInMainThread(Runnable runnable) {
mainThreadExecutor.execute(runnable);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index 3216f3181fa..a2dcb672a6a 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -131,7 +132,8 @@ class JobManagerMetricsITCase {
expectedPattern, gaugeNames));
}
}
-
+ // wait for metrics update
+
Thread.sleep(FineGrainedSlotManager.METRICS_UPDATE_INTERVAL.toMillis());
for (Map.Entry<Gauge<?>, String> entry :
reporter.getGauges().entrySet()) {
if (entry.getValue().contains(MetricNames.TASK_SLOTS_AVAILABLE)) {
assertEquals(0L, entry.getKey().getValue());