This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9350bb28 [coordinator] optimize coordinator event metric update logic 
(#1465)
9350bb28 is described below

commit 9350bb2828813ca1433586414239581b90989382
Author: Yang Wang <[email protected]>
AuthorDate: Mon Aug 4 16:19:43 2025 +0800

    [coordinator] optimize coordinator event metric update logic (#1465)
    
    Co-authored-by: ocean.wy <[email protected]>
---
 .../coordinator/CoordinatorEventProcessor.java     | 156 +++++++--------------
 .../coordinator/event/CoordinatorEventManager.java | 119 +++++++++++++++-
 2 files changed, 167 insertions(+), 108 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
index 56693f10..afadb750 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -34,7 +34,6 @@ import com.alibaba.fluss.metadata.TableBucketReplica;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePartition;
 import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.metrics.MetricNames;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
 import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
@@ -60,7 +59,6 @@ import 
com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
 import 
com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
 import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
 import 
com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
-import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
 import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
 import 
com.alibaba.fluss.server.coordinator.statemachine.TableBucketStateMachine;
 import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
@@ -136,13 +134,6 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
     private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
 
-    // metrics
-    private volatile int tabletServerCount;
-    private volatile int offlineBucketCount;
-    private volatile int tableCount;
-    private volatile int bucketCount;
-    private volatile int replicasToDeleteCount;
-
     public CoordinatorEventProcessor(
             ZooKeeperClient zooKeeperClient,
             CoordinatorMetadataCache serverMetadataCache,
@@ -198,18 +189,6 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         this.lakeTableTieringManager = lakeTableTieringManager;
         this.coordinatorMetricGroup = coordinatorMetricGroup;
         this.internalListenerName = 
conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
-        registerMetrics();
-    }
-
-    private void registerMetrics() {
-        coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () 
-> 1);
-        coordinatorMetricGroup.gauge(
-                MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> 
tabletServerCount);
-        coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> 
offlineBucketCount);
-        coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> 
bucketCount);
-        coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> 
tableCount);
-        coordinatorMetricGroup.gauge(
-                MetricNames.REPLICAS_TO_DELETE_COUNT, () -> 
replicasToDeleteCount);
     }
 
     public CoordinatorEventManager getCoordinatorEventManager() {
@@ -242,7 +221,6 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
         // start table manager
         tableManager.startup();
-        updateMetrics();
 
         // start the event manager which will then process the event
         coordinatorEventManager.start();
@@ -447,92 +425,56 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
     @Override
     public void process(CoordinatorEvent event) {
-        try {
-            if (event instanceof CreateTableEvent) {
-                processCreateTable((CreateTableEvent) event);
-            } else if (event instanceof CreatePartitionEvent) {
-                processCreatePartition((CreatePartitionEvent) event);
-            } else if (event instanceof DropTableEvent) {
-                processDropTable((DropTableEvent) event);
-            } else if (event instanceof DropPartitionEvent) {
-                processDropPartition((DropPartitionEvent) event);
-            } else if (event instanceof 
NotifyLeaderAndIsrResponseReceivedEvent) {
-                processNotifyLeaderAndIsrResponseReceivedEvent(
-                        (NotifyLeaderAndIsrResponseReceivedEvent) event);
-            } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
-                
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) 
event);
-            } else if (event instanceof NewTabletServerEvent) {
-                processNewTabletServer((NewTabletServerEvent) event);
-            } else if (event instanceof DeadTabletServerEvent) {
-                processDeadTabletServer((DeadTabletServerEvent) event);
-            } else if (event instanceof AdjustIsrReceivedEvent) {
-                AdjustIsrReceivedEvent adjustIsrReceivedEvent = 
(AdjustIsrReceivedEvent) event;
-                CompletableFuture<AdjustIsrResponse> callback =
-                        adjustIsrReceivedEvent.getRespCallback();
-                completeFromCallable(
-                        callback,
-                        () ->
-                                makeAdjustIsrResponse(
-                                        tryProcessAdjustIsr(
-                                                
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
-            } else if (event instanceof CommitKvSnapshotEvent) {
-                CommitKvSnapshotEvent commitKvSnapshotEvent = 
(CommitKvSnapshotEvent) event;
-                CompletableFuture<CommitKvSnapshotResponse> callback =
-                        commitKvSnapshotEvent.getRespCallback();
-                completeFromCallable(
-                        callback, () -> 
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
-            } else if (event instanceof CommitRemoteLogManifestEvent) {
-                CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
-                        (CommitRemoteLogManifestEvent) event;
-                completeFromCallable(
-                        commitRemoteLogManifestEvent.getRespCallback(),
-                        () -> 
tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
-            } else if (event instanceof CommitLakeTableSnapshotEvent) {
-                CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
-                        (CommitLakeTableSnapshotEvent) event;
-                completeFromCallable(
-                        commitLakeTableSnapshotEvent.getRespCallback(),
-                        () -> 
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
-            } else if (event instanceof AccessContextEvent) {
-                AccessContextEvent<?> accessContextEvent = 
(AccessContextEvent<?>) event;
-                processAccessContext(accessContextEvent);
-            } else {
-                LOG.warn("Unknown event type: {}", event.getClass().getName());
-            }
-        } finally {
-            updateMetrics();
-        }
-    }
-
-    private void updateMetrics() {
-        tabletServerCount = coordinatorContext.getLiveTabletServers().size();
-        tableCount = coordinatorContext.allTables().size();
-        bucketCount = coordinatorContext.bucketLeaderAndIsr().size();
-        offlineBucketCount = coordinatorContext.getOfflineBucketCount();
-
-        int replicasToDeletes = 0;
-        // for replica in partitions to be deleted
-        for (TablePartition tablePartition : 
coordinatorContext.getPartitionsToBeDeleted()) {
-            for (TableBucketReplica replica :
-                    coordinatorContext.getAllReplicasForPartition(
-                            tablePartition.getTableId(), 
tablePartition.getPartitionId())) {
-                replicasToDeletes =
-                        isReplicaToDelete(replica) ? replicasToDeletes + 1 : 
replicasToDeletes;
-            }
-        }
-        // for replica in tables to be deleted
-        for (long tableId : coordinatorContext.getTablesToBeDeleted()) {
-            for (TableBucketReplica replica : 
coordinatorContext.getAllReplicasForTable(tableId)) {
-                replicasToDeletes =
-                        isReplicaToDelete(replica) ? replicasToDeletes + 1 : 
replicasToDeletes;
-            }
+        if (event instanceof CreateTableEvent) {
+            processCreateTable((CreateTableEvent) event);
+        } else if (event instanceof CreatePartitionEvent) {
+            processCreatePartition((CreatePartitionEvent) event);
+        } else if (event instanceof DropTableEvent) {
+            processDropTable((DropTableEvent) event);
+        } else if (event instanceof DropPartitionEvent) {
+            processDropPartition((DropPartitionEvent) event);
+        } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
+            processNotifyLeaderAndIsrResponseReceivedEvent(
+                    (NotifyLeaderAndIsrResponseReceivedEvent) event);
+        } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
+            
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) 
event);
+        } else if (event instanceof NewTabletServerEvent) {
+            processNewTabletServer((NewTabletServerEvent) event);
+        } else if (event instanceof DeadTabletServerEvent) {
+            processDeadTabletServer((DeadTabletServerEvent) event);
+        } else if (event instanceof AdjustIsrReceivedEvent) {
+            AdjustIsrReceivedEvent adjustIsrReceivedEvent = 
(AdjustIsrReceivedEvent) event;
+            CompletableFuture<AdjustIsrResponse> callback =
+                    adjustIsrReceivedEvent.getRespCallback();
+            completeFromCallable(
+                    callback,
+                    () ->
+                            makeAdjustIsrResponse(
+                                    tryProcessAdjustIsr(
+                                            
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
+        } else if (event instanceof CommitKvSnapshotEvent) {
+            CommitKvSnapshotEvent commitKvSnapshotEvent = 
(CommitKvSnapshotEvent) event;
+            CompletableFuture<CommitKvSnapshotResponse> callback =
+                    commitKvSnapshotEvent.getRespCallback();
+            completeFromCallable(callback, () -> 
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
+        } else if (event instanceof CommitRemoteLogManifestEvent) {
+            CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
+                    (CommitRemoteLogManifestEvent) event;
+            completeFromCallable(
+                    commitRemoteLogManifestEvent.getRespCallback(),
+                    () -> 
tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
+        } else if (event instanceof CommitLakeTableSnapshotEvent) {
+            CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
+                    (CommitLakeTableSnapshotEvent) event;
+            completeFromCallable(
+                    commitLakeTableSnapshotEvent.getRespCallback(),
+                    () -> 
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
+        } else if (event instanceof AccessContextEvent) {
+            AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) 
event;
+            processAccessContext(accessContextEvent);
+        } else {
+            LOG.warn("Unknown event type: {}", event.getClass().getName());
         }
-        this.replicasToDeleteCount = replicasToDeletes;
-    }
-
-    private boolean isReplicaToDelete(TableBucketReplica replica) {
-        ReplicaState replicaState = 
coordinatorContext.getReplicaState(replica);
-        return replicaState != null && replicaState != 
ReplicaDeletionSuccessful;
     }
 
     private void processCreateTable(CreateTableEvent createTableEvent) {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
index addc21f8..c220a9f6 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -18,9 +18,13 @@
 package com.alibaba.fluss.server.coordinator.event;
 
 import com.alibaba.fluss.annotation.Internal;
+import com.alibaba.fluss.metadata.TableBucketReplica;
+import com.alibaba.fluss.metadata.TablePartition;
 import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram;
 import com.alibaba.fluss.metrics.Histogram;
 import com.alibaba.fluss.metrics.MetricNames;
+import com.alibaba.fluss.server.coordinator.CoordinatorContext;
+import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
 import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
 import com.alibaba.fluss.utils.concurrent.ShutdownableThread;
 
@@ -31,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static 
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
 import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
 
 /**
@@ -56,7 +61,15 @@ public final class CoordinatorEventManager implements 
EventManager {
     private Histogram eventProcessingTime;
     private Histogram eventQueueTime;
 
+    // Coordinator metrics moved from CoordinatorEventProcessor
+    private volatile int tabletServerCount;
+    private volatile int offlineBucketCount;
+    private volatile int tableCount;
+    private volatile int bucketCount;
+    private volatile int replicasToDeleteCount;
+
     private static final int WINDOW_SIZE = 100;
+    private static final long METRICS_UPDATE_INTERVAL_MS = 5000; // 5 seconds
 
     public CoordinatorEventManager(
             EventProcessor eventProcessor, CoordinatorMetricGroup 
coordinatorMetricGroup) {
@@ -77,6 +90,80 @@ public final class CoordinatorEventManager implements 
EventManager {
                 coordinatorMetricGroup.histogram(
                         MetricNames.EVENT_QUEUE_TIME_MS,
                         new DescriptiveStatisticsHistogram(WINDOW_SIZE));
+
+        // Register coordinator metrics
+        coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () 
-> 1);
+        coordinatorMetricGroup.gauge(
+                MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> 
tabletServerCount);
+        coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> 
offlineBucketCount);
+        coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> 
bucketCount);
+        coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> 
tableCount);
+        coordinatorMetricGroup.gauge(
+                MetricNames.REPLICAS_TO_DELETE_COUNT, () -> 
replicasToDeleteCount);
+    }
+
+    /** Not thread safety! this method can only be executed in the 
CoordinatorEventThread. */
+    private void updateMetricsViaAccessContext() {
+        // Create AccessContextEvent to safely access CoordinatorContext
+        AccessContextEvent<MetricsData> accessContextEvent =
+                new AccessContextEvent<>(
+                        context -> {
+                            int tabletServerCount = 
context.getLiveTabletServers().size();
+                            int tableCount = context.allTables().size();
+                            int bucketCount = 
context.bucketLeaderAndIsr().size();
+                            int offlineBucketCount = 
context.getOfflineBucketCount();
+
+                            int replicasToDeletes = 0;
+                            // for replica in partitions to be deleted
+                            for (TablePartition tablePartition :
+                                    context.getPartitionsToBeDeleted()) {
+                                for (TableBucketReplica replica :
+                                        context.getAllReplicasForPartition(
+                                                tablePartition.getTableId(),
+                                                
tablePartition.getPartitionId())) {
+                                    replicasToDeletes =
+                                            isReplicaToDelete(replica, context)
+                                                    ? replicasToDeletes + 1
+                                                    : replicasToDeletes;
+                                }
+                            }
+                            // for replica in tables to be deleted
+                            for (long tableId : 
context.getTablesToBeDeleted()) {
+                                for (TableBucketReplica replica :
+                                        
context.getAllReplicasForTable(tableId)) {
+                                    replicasToDeletes =
+                                            isReplicaToDelete(replica, context)
+                                                    ? replicasToDeletes + 1
+                                                    : replicasToDeletes;
+                                }
+                            }
+
+                            return new MetricsData(
+                                    tabletServerCount,
+                                    tableCount,
+                                    bucketCount,
+                                    offlineBucketCount,
+                                    replicasToDeletes);
+                        });
+
+        eventProcessor.process(accessContextEvent);
+
+        // Wait for the result and update local metrics
+        try {
+            MetricsData metricsData = 
accessContextEvent.getResultFuture().get();
+            this.tabletServerCount = metricsData.tabletServerCount;
+            this.tableCount = metricsData.tableCount;
+            this.bucketCount = metricsData.bucketCount;
+            this.offlineBucketCount = metricsData.offlineBucketCount;
+            this.replicasToDeleteCount = metricsData.replicasToDeleteCount;
+        } catch (Exception e) {
+            LOG.warn("Failed to update metrics via AccessContextEvent", e);
+        }
+    }
+
+    private boolean isReplicaToDelete(TableBucketReplica replica, 
CoordinatorContext context) {
+        ReplicaState replicaState = context.getReplicaState(replica);
+        return replicaState != null && replicaState != 
ReplicaDeletionSuccessful;
     }
 
     public void start() {
@@ -123,12 +210,21 @@ public final class CoordinatorEventManager implements 
EventManager {
 
     private class CoordinatorEventThread extends ShutdownableThread {
 
+        private long lastMetricsUpdateTime = System.currentTimeMillis();
+
         public CoordinatorEventThread(String name) {
             super(name, false);
         }
 
         @Override
         public void doWork() throws Exception {
+            // Check if it's time to update metrics (before taking event from 
queue)
+            long currentTime = System.currentTimeMillis();
+            if (currentTime - lastMetricsUpdateTime >= 
METRICS_UPDATE_INTERVAL_MS) {
+                updateMetricsViaAccessContext();
+                lastMetricsUpdateTime = currentTime;
+            }
+
             QueuedEvent queuedEvent = queue.take();
             CoordinatorEvent coordinatorEvent = queuedEvent.event;
 
@@ -144,7 +240,7 @@ public final class CoordinatorEventManager implements 
EventManager {
                     eventProcessor.process(coordinatorEvent);
                 }
             } catch (Throwable e) {
-                log.error("Uncaught error processing event {}.", 
coordinatorEvent, e);
+                LOG.error("Uncaught error processing event {}.", 
coordinatorEvent, e);
             } finally {
                 long costTimeMs = System.currentTimeMillis() - 
eventStartTimeMs;
                 eventProcessingTime.update(costTimeMs);
@@ -166,4 +262,25 @@ public final class CoordinatorEventManager implements 
EventManager {
             this.enqueueTimeMs = enqueueTimeMs;
         }
     }
+
+    private static class MetricsData {
+        private final int tabletServerCount;
+        private final int tableCount;
+        private final int bucketCount;
+        private final int offlineBucketCount;
+        private final int replicasToDeleteCount;
+
+        public MetricsData(
+                int tabletServerCount,
+                int tableCount,
+                int bucketCount,
+                int offlineBucketCount,
+                int replicasToDeleteCount) {
+            this.tabletServerCount = tabletServerCount;
+            this.tableCount = tableCount;
+            this.bucketCount = bucketCount;
+            this.offlineBucketCount = offlineBucketCount;
+            this.replicasToDeleteCount = replicasToDeleteCount;
+        }
+    }
 }

Reply via email to