This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e5f3ea784 [coordinator] Coordinator report event metrics of enqueue
rate and event process time labeled by event_type (#1466)
e5f3ea784 is described below
commit e5f3ea78451522f2b0b1cd7d4599e68135eb6845
Author: Yang Wang <[email protected]>
AuthorDate: Mon Sep 15 22:29:00 2025 +0800
[coordinator] Coordinator report event metrics of enqueue rate and event
process time labeled by event_type (#1466)
---
.../coordinator/event/CoordinatorEventManager.java | 21 +++---
.../metrics/group/CoordinatorEventMetricGroup.java | 86 ++++++++++++++++++++++
.../metrics/group/CoordinatorMetricGroup.java | 11 +++
.../maintenance/observability/monitor-metrics.md | 15 ++--
4 files changed, 116 insertions(+), 17 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
index 06a92dabb..36d82df53 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
+import org.apache.fluss.server.metrics.group.CoordinatorEventMetricGroup;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.utils.concurrent.ShutdownableThread;
@@ -58,7 +59,6 @@ public final class CoordinatorEventManager implements
EventManager {
private final Lock putLock = new ReentrantLock();
// metrics
- private Histogram eventProcessingTime;
private Histogram eventQueueTime;
// Coordinator metrics moved from CoordinatorEventProcessor
@@ -79,13 +79,6 @@ public final class CoordinatorEventManager implements
EventManager {
}
private void registerMetrics() {
- coordinatorMetricGroup.gauge(MetricNames.EVENT_QUEUE_SIZE,
queue::size);
-
- eventProcessingTime =
- coordinatorMetricGroup.histogram(
- MetricNames.EVENT_PROCESSING_TIME_MS,
- new DescriptiveStatisticsHistogram(WINDOW_SIZE));
-
eventQueueTime =
coordinatorMetricGroup.histogram(
MetricNames.EVENT_QUEUE_TIME_MS,
@@ -188,7 +181,10 @@ public final class CoordinatorEventManager implements
EventManager {
QueuedEvent queuedEvent =
new QueuedEvent(event,
System.currentTimeMillis());
queue.put(queuedEvent);
-
+ coordinatorMetricGroup
+ .getOrAddEventTypeMetricGroup(event.getClass())
+ .queuedEventCount()
+ .inc();
LOG.debug(
"Put coordinator event {} of event type {}.",
event,
@@ -243,7 +239,12 @@ public final class CoordinatorEventManager implements
EventManager {
LOG.error("Uncaught error processing event {}.",
coordinatorEvent, e);
} finally {
long costTimeMs = System.currentTimeMillis() -
eventStartTimeMs;
- eventProcessingTime.update(costTimeMs);
+ // Use event type specific histogram
+ CoordinatorEventMetricGroup eventMetricGroup =
+ coordinatorMetricGroup.getOrAddEventTypeMetricGroup(
+ coordinatorEvent.getClass());
+ eventMetricGroup.eventProcessingTime().update(costTimeMs);
+ eventMetricGroup.queuedEventCount().dec();
LOG.debug(
"Finished processing event {} of event type {} in
{}ms.",
coordinatorEvent,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
new file mode 100644
index 000000000..d9a06324d
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
+import org.apache.fluss.metrics.Histogram;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
+
+import java.util.Map;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+
+/**
+ * Metric group for coordinator event types. This group adds an additional
dimension "event_type" to
+ * the metrics and manages event-specific metrics.
+ */
+public class CoordinatorEventMetricGroup extends AbstractMetricGroup {
+
+ private final Class<? extends CoordinatorEvent> eventClass;
+ private final Histogram eventProcessingTime;
+ private final Counter queuedEventCount;
+
+ public CoordinatorEventMetricGroup(
+ MetricRegistry registry,
+ Class<? extends CoordinatorEvent> eventClass,
+ CoordinatorMetricGroup parent) {
+ super(registry, makeScope(parent, eventClass.getSimpleName()), parent);
+ this.eventClass = eventClass;
+
+ this.eventProcessingTime =
+ histogram(
+ MetricNames.EVENT_PROCESSING_TIME_MS,
+ new DescriptiveStatisticsHistogram(100));
+ this.queuedEventCount =
+ counter(MetricNames.EVENT_QUEUE_SIZE, new
ThreadSafeSimpleCounter());
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return "event";
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ variables.put("event_type", eventClass.getSimpleName());
+ }
+
+ /**
+ * Returns the histogram for event processing time.
+ *
+ * @return the event processing time histogram
+ */
+ public Histogram eventProcessingTime() {
+ return eventProcessingTime;
+ }
+
+ /**
+ * Returns the counter for event count.
+ *
+ * @return the event count counter
+ */
+ public Counter queuedEventCount() {
+ return queuedEventCount;
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
index 24f9b3261..251fb462b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
@@ -20,6 +20,8 @@ package org.apache.fluss.server.metrics.group;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
+import org.apache.fluss.utils.MapUtils;
import java.util.Map;
@@ -32,6 +34,9 @@ public class CoordinatorMetricGroup extends
AbstractMetricGroup {
protected final String hostname;
protected final String serverId;
+ private final Map<Class<? extends CoordinatorEvent>,
CoordinatorEventMetricGroup>
+ eventMetricGroups = MapUtils.newConcurrentHashMap();
+
public CoordinatorMetricGroup(
MetricRegistry registry, String clusterId, String hostname, String
serverId) {
super(registry, new String[] {clusterId, hostname, NAME}, null);
@@ -51,4 +56,10 @@ public class CoordinatorMetricGroup extends
AbstractMetricGroup {
variables.put("host", hostname);
variables.put("server_id", serverId);
}
+
+ public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup(
+ Class<? extends CoordinatorEvent> eventClass) {
+ return eventMetricGroups.computeIfAbsent(
+ eventClass, e -> new CoordinatorEventMetricGroup(registry,
eventClass, this));
+ }
}
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index ac351d510..a01f9e67c 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -295,7 +295,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<tbody>
<tr>
<th rowspan="9"><strong>coordinator</strong></th>
- <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="9">-</td>
+ <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="7">-</td>
<td>activeCoordinatorCount</td>
<td>The number of active CoordinatorServer in this cluster.</td>
<td>Gauge</td>
@@ -325,19 +325,20 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The total number of replicas in the progress to be deleted in this
cluster.</td>
<td>Gauge</td>
</tr>
- <tr>
- <td>eventQueueSize</td>
- <td>The number of events waiting to be processed in the queue.</td>
- <td>Gauge</td>
- </tr>
<tr>
<td>eventQueueTimeMs</td>
<td>The time that an event spent waiting in the queue to be
processed.</td>
<td>Histogram</td>
</tr>
+ <tr>
+ <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="2">event</td>
+ <td>eventQueueSize</td>
+ <td>The number of events currently waiting to be processed in the
coordinator event queue. This metric is labeled with <code>event_type</code> to
distinguish between different types of coordinator events.</td>
+ <td>Gauge</td>
+ </tr>
<tr>
<td>eventProcessingTimeMs</td>
- <td>The time that an event took to be processed.</td>
+ <td>The time that an event took to be processed by the coordinator event
processor. This metric is labeled with <code>event_type</code> to distinguish
between different types of coordinator events.</td>
<td>Histogram</td>
</tr>
</tbody>