This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9350bb28 [coordinator] optimize coordinator event metric update logic
(#1465)
9350bb28 is described below
commit 9350bb2828813ca1433586414239581b90989382
Author: Yang Wang <[email protected]>
AuthorDate: Mon Aug 4 16:19:43 2025 +0800
[coordinator] optimize coordinator event metric update logic (#1465)
Co-authored-by: ocean.wy <[email protected]>
---
.../coordinator/CoordinatorEventProcessor.java | 156 +++++++--------------
.../coordinator/event/CoordinatorEventManager.java | 119 +++++++++++++++-
2 files changed, 167 insertions(+), 108 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
index 56693f10..afadb750 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -34,7 +34,6 @@ import com.alibaba.fluss.metadata.TableBucketReplica;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePartition;
import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
@@ -60,7 +59,6 @@ import
com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
import
com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import
com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
-import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
import
com.alibaba.fluss.server.coordinator.statemachine.TableBucketStateMachine;
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
@@ -136,13 +134,6 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
- // metrics
- private volatile int tabletServerCount;
- private volatile int offlineBucketCount;
- private volatile int tableCount;
- private volatile int bucketCount;
- private volatile int replicasToDeleteCount;
-
public CoordinatorEventProcessor(
ZooKeeperClient zooKeeperClient,
CoordinatorMetadataCache serverMetadataCache,
@@ -198,18 +189,6 @@ public class CoordinatorEventProcessor implements
EventProcessor {
this.lakeTableTieringManager = lakeTableTieringManager;
this.coordinatorMetricGroup = coordinatorMetricGroup;
this.internalListenerName =
conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
- registerMetrics();
- }
-
- private void registerMetrics() {
- coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, ()
-> 1);
- coordinatorMetricGroup.gauge(
- MetricNames.ACTIVE_TABLET_SERVER_COUNT, () ->
tabletServerCount);
- coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () ->
offlineBucketCount);
- coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () ->
bucketCount);
- coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () ->
tableCount);
- coordinatorMetricGroup.gauge(
- MetricNames.REPLICAS_TO_DELETE_COUNT, () ->
replicasToDeleteCount);
}
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -242,7 +221,6 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// start table manager
tableManager.startup();
- updateMetrics();
// start the event manager which will then process the event
coordinatorEventManager.start();
@@ -447,92 +425,56 @@ public class CoordinatorEventProcessor implements
EventProcessor {
@Override
public void process(CoordinatorEvent event) {
- try {
- if (event instanceof CreateTableEvent) {
- processCreateTable((CreateTableEvent) event);
- } else if (event instanceof CreatePartitionEvent) {
- processCreatePartition((CreatePartitionEvent) event);
- } else if (event instanceof DropTableEvent) {
- processDropTable((DropTableEvent) event);
- } else if (event instanceof DropPartitionEvent) {
- processDropPartition((DropPartitionEvent) event);
- } else if (event instanceof
NotifyLeaderAndIsrResponseReceivedEvent) {
- processNotifyLeaderAndIsrResponseReceivedEvent(
- (NotifyLeaderAndIsrResponseReceivedEvent) event);
- } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
-
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent)
event);
- } else if (event instanceof NewTabletServerEvent) {
- processNewTabletServer((NewTabletServerEvent) event);
- } else if (event instanceof DeadTabletServerEvent) {
- processDeadTabletServer((DeadTabletServerEvent) event);
- } else if (event instanceof AdjustIsrReceivedEvent) {
- AdjustIsrReceivedEvent adjustIsrReceivedEvent =
(AdjustIsrReceivedEvent) event;
- CompletableFuture<AdjustIsrResponse> callback =
- adjustIsrReceivedEvent.getRespCallback();
- completeFromCallable(
- callback,
- () ->
- makeAdjustIsrResponse(
- tryProcessAdjustIsr(
-
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
- } else if (event instanceof CommitKvSnapshotEvent) {
- CommitKvSnapshotEvent commitKvSnapshotEvent =
(CommitKvSnapshotEvent) event;
- CompletableFuture<CommitKvSnapshotResponse> callback =
- commitKvSnapshotEvent.getRespCallback();
- completeFromCallable(
- callback, () ->
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
- } else if (event instanceof CommitRemoteLogManifestEvent) {
- CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
- (CommitRemoteLogManifestEvent) event;
- completeFromCallable(
- commitRemoteLogManifestEvent.getRespCallback(),
- () ->
tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
- } else if (event instanceof CommitLakeTableSnapshotEvent) {
- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
- (CommitLakeTableSnapshotEvent) event;
- completeFromCallable(
- commitLakeTableSnapshotEvent.getRespCallback(),
- () ->
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
- } else if (event instanceof AccessContextEvent) {
- AccessContextEvent<?> accessContextEvent =
(AccessContextEvent<?>) event;
- processAccessContext(accessContextEvent);
- } else {
- LOG.warn("Unknown event type: {}", event.getClass().getName());
- }
- } finally {
- updateMetrics();
- }
- }
-
- private void updateMetrics() {
- tabletServerCount = coordinatorContext.getLiveTabletServers().size();
- tableCount = coordinatorContext.allTables().size();
- bucketCount = coordinatorContext.bucketLeaderAndIsr().size();
- offlineBucketCount = coordinatorContext.getOfflineBucketCount();
-
- int replicasToDeletes = 0;
- // for replica in partitions to be deleted
- for (TablePartition tablePartition :
coordinatorContext.getPartitionsToBeDeleted()) {
- for (TableBucketReplica replica :
- coordinatorContext.getAllReplicasForPartition(
- tablePartition.getTableId(),
tablePartition.getPartitionId())) {
- replicasToDeletes =
- isReplicaToDelete(replica) ? replicasToDeletes + 1 :
replicasToDeletes;
- }
- }
- // for replica in tables to be deleted
- for (long tableId : coordinatorContext.getTablesToBeDeleted()) {
- for (TableBucketReplica replica :
coordinatorContext.getAllReplicasForTable(tableId)) {
- replicasToDeletes =
- isReplicaToDelete(replica) ? replicasToDeletes + 1 :
replicasToDeletes;
- }
+ if (event instanceof CreateTableEvent) {
+ processCreateTable((CreateTableEvent) event);
+ } else if (event instanceof CreatePartitionEvent) {
+ processCreatePartition((CreatePartitionEvent) event);
+ } else if (event instanceof DropTableEvent) {
+ processDropTable((DropTableEvent) event);
+ } else if (event instanceof DropPartitionEvent) {
+ processDropPartition((DropPartitionEvent) event);
+ } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
+ processNotifyLeaderAndIsrResponseReceivedEvent(
+ (NotifyLeaderAndIsrResponseReceivedEvent) event);
+ } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
+
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent)
event);
+ } else if (event instanceof NewTabletServerEvent) {
+ processNewTabletServer((NewTabletServerEvent) event);
+ } else if (event instanceof DeadTabletServerEvent) {
+ processDeadTabletServer((DeadTabletServerEvent) event);
+ } else if (event instanceof AdjustIsrReceivedEvent) {
+ AdjustIsrReceivedEvent adjustIsrReceivedEvent =
(AdjustIsrReceivedEvent) event;
+ CompletableFuture<AdjustIsrResponse> callback =
+ adjustIsrReceivedEvent.getRespCallback();
+ completeFromCallable(
+ callback,
+ () ->
+ makeAdjustIsrResponse(
+ tryProcessAdjustIsr(
+
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
+ } else if (event instanceof CommitKvSnapshotEvent) {
+ CommitKvSnapshotEvent commitKvSnapshotEvent =
(CommitKvSnapshotEvent) event;
+ CompletableFuture<CommitKvSnapshotResponse> callback =
+ commitKvSnapshotEvent.getRespCallback();
+ completeFromCallable(callback, () ->
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
+ } else if (event instanceof CommitRemoteLogManifestEvent) {
+ CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
+ (CommitRemoteLogManifestEvent) event;
+ completeFromCallable(
+ commitRemoteLogManifestEvent.getRespCallback(),
+ () ->
tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
+ } else if (event instanceof CommitLakeTableSnapshotEvent) {
+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
+ (CommitLakeTableSnapshotEvent) event;
+ completeFromCallable(
+ commitLakeTableSnapshotEvent.getRespCallback(),
+ () ->
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
+ } else if (event instanceof AccessContextEvent) {
+ AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>)
event;
+ processAccessContext(accessContextEvent);
+ } else {
+ LOG.warn("Unknown event type: {}", event.getClass().getName());
}
- this.replicasToDeleteCount = replicasToDeletes;
- }
-
- private boolean isReplicaToDelete(TableBucketReplica replica) {
- ReplicaState replicaState =
coordinatorContext.getReplicaState(replica);
- return replicaState != null && replicaState !=
ReplicaDeletionSuccessful;
}
private void processCreateTable(CreateTableEvent createTableEvent) {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
index addc21f8..c220a9f6 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -18,9 +18,13 @@
package com.alibaba.fluss.server.coordinator.event;
import com.alibaba.fluss.annotation.Internal;
+import com.alibaba.fluss.metadata.TableBucketReplica;
+import com.alibaba.fluss.metadata.TablePartition;
import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram;
import com.alibaba.fluss.metrics.Histogram;
import com.alibaba.fluss.metrics.MetricNames;
+import com.alibaba.fluss.server.coordinator.CoordinatorContext;
+import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
import com.alibaba.fluss.utils.concurrent.ShutdownableThread;
@@ -31,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
/**
@@ -56,7 +61,15 @@ public final class CoordinatorEventManager implements
EventManager {
private Histogram eventProcessingTime;
private Histogram eventQueueTime;
+ // Coordinator metrics moved from CoordinatorEventProcessor
+ private volatile int tabletServerCount;
+ private volatile int offlineBucketCount;
+ private volatile int tableCount;
+ private volatile int bucketCount;
+ private volatile int replicasToDeleteCount;
+
private static final int WINDOW_SIZE = 100;
+ private static final long METRICS_UPDATE_INTERVAL_MS = 5000; // 5 seconds
public CoordinatorEventManager(
EventProcessor eventProcessor, CoordinatorMetricGroup
coordinatorMetricGroup) {
@@ -77,6 +90,80 @@ public final class CoordinatorEventManager implements
EventManager {
coordinatorMetricGroup.histogram(
MetricNames.EVENT_QUEUE_TIME_MS,
new DescriptiveStatisticsHistogram(WINDOW_SIZE));
+
+ // Register coordinator metrics
+ coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, ()
-> 1);
+ coordinatorMetricGroup.gauge(
+ MetricNames.ACTIVE_TABLET_SERVER_COUNT, () ->
tabletServerCount);
+ coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () ->
offlineBucketCount);
+ coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () ->
bucketCount);
+ coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () ->
tableCount);
+ coordinatorMetricGroup.gauge(
+ MetricNames.REPLICAS_TO_DELETE_COUNT, () ->
replicasToDeleteCount);
+ }
+
+ /** Not thread safety! this method can only be executed in the
CoordinatorEventThread. */
+ private void updateMetricsViaAccessContext() {
+ // Create AccessContextEvent to safely access CoordinatorContext
+ AccessContextEvent<MetricsData> accessContextEvent =
+ new AccessContextEvent<>(
+ context -> {
+ int tabletServerCount =
context.getLiveTabletServers().size();
+ int tableCount = context.allTables().size();
+ int bucketCount =
context.bucketLeaderAndIsr().size();
+ int offlineBucketCount =
context.getOfflineBucketCount();
+
+ int replicasToDeletes = 0;
+ // for replica in partitions to be deleted
+ for (TablePartition tablePartition :
+ context.getPartitionsToBeDeleted()) {
+ for (TableBucketReplica replica :
+ context.getAllReplicasForPartition(
+ tablePartition.getTableId(),
+
tablePartition.getPartitionId())) {
+ replicasToDeletes =
+ isReplicaToDelete(replica, context)
+ ? replicasToDeletes + 1
+ : replicasToDeletes;
+ }
+ }
+ // for replica in tables to be deleted
+ for (long tableId :
context.getTablesToBeDeleted()) {
+ for (TableBucketReplica replica :
+
context.getAllReplicasForTable(tableId)) {
+ replicasToDeletes =
+ isReplicaToDelete(replica, context)
+ ? replicasToDeletes + 1
+ : replicasToDeletes;
+ }
+ }
+
+ return new MetricsData(
+ tabletServerCount,
+ tableCount,
+ bucketCount,
+ offlineBucketCount,
+ replicasToDeletes);
+ });
+
+ eventProcessor.process(accessContextEvent);
+
+ // Wait for the result and update local metrics
+ try {
+ MetricsData metricsData =
accessContextEvent.getResultFuture().get();
+ this.tabletServerCount = metricsData.tabletServerCount;
+ this.tableCount = metricsData.tableCount;
+ this.bucketCount = metricsData.bucketCount;
+ this.offlineBucketCount = metricsData.offlineBucketCount;
+ this.replicasToDeleteCount = metricsData.replicasToDeleteCount;
+ } catch (Exception e) {
+ LOG.warn("Failed to update metrics via AccessContextEvent", e);
+ }
+ }
+
+ private boolean isReplicaToDelete(TableBucketReplica replica,
CoordinatorContext context) {
+ ReplicaState replicaState = context.getReplicaState(replica);
+ return replicaState != null && replicaState !=
ReplicaDeletionSuccessful;
}
public void start() {
@@ -123,12 +210,21 @@ public final class CoordinatorEventManager implements
EventManager {
private class CoordinatorEventThread extends ShutdownableThread {
+ private long lastMetricsUpdateTime = System.currentTimeMillis();
+
public CoordinatorEventThread(String name) {
super(name, false);
}
@Override
public void doWork() throws Exception {
+ // Check if it's time to update metrics (before taking event from
queue)
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastMetricsUpdateTime >=
METRICS_UPDATE_INTERVAL_MS) {
+ updateMetricsViaAccessContext();
+ lastMetricsUpdateTime = currentTime;
+ }
+
QueuedEvent queuedEvent = queue.take();
CoordinatorEvent coordinatorEvent = queuedEvent.event;
@@ -144,7 +240,7 @@ public final class CoordinatorEventManager implements
EventManager {
eventProcessor.process(coordinatorEvent);
}
} catch (Throwable e) {
- log.error("Uncaught error processing event {}.",
coordinatorEvent, e);
+ LOG.error("Uncaught error processing event {}.",
coordinatorEvent, e);
} finally {
long costTimeMs = System.currentTimeMillis() -
eventStartTimeMs;
eventProcessingTime.update(costTimeMs);
@@ -166,4 +262,25 @@ public final class CoordinatorEventManager implements
EventManager {
this.enqueueTimeMs = enqueueTimeMs;
}
}
+
+ private static class MetricsData {
+ private final int tabletServerCount;
+ private final int tableCount;
+ private final int bucketCount;
+ private final int offlineBucketCount;
+ private final int replicasToDeleteCount;
+
+ public MetricsData(
+ int tabletServerCount,
+ int tableCount,
+ int bucketCount,
+ int offlineBucketCount,
+ int replicasToDeleteCount) {
+ this.tabletServerCount = tabletServerCount;
+ this.tableCount = tableCount;
+ this.bucketCount = bucketCount;
+ this.offlineBucketCount = offlineBucketCount;
+ this.replicasToDeleteCount = replicasToDeleteCount;
+ }
+ }
}