This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-2080 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 5535697d404820a0265a014c76e884352b846df5 Author: Jark Wu <[email protected]> AuthorDate: Fri Dec 19 19:19:05 2025 +0800 WIP --- .../org/apache/fluss/config/ConfigOptions.java | 8 - .../java/org/apache/fluss/metrics/MetricNames.java | 4 +- .../fluss/metrics/groups/AbstractMetricGroup.java | 8 - .../apache/fluss/server/entity/UserContext.java | 4 +- .../apache/fluss/server/metrics/MetricManager.java | 127 ----------- .../fluss/server/metrics/ServerMetricUtils.java | 6 +- .../apache/fluss/server/metrics/UserMetrics.java | 231 +++++++++++++++++++++ ...tricGroup.java => AbstractUserMetricGroup.java} | 50 +++-- .../server/metrics/group/TableMetricGroup.java | 80 +------ .../metrics/group/TabletServerMetricGroup.java | 52 +---- .../server/metrics/group/UserMetricGroup.java | 54 +---- .../metrics/group/UserPerTableMetricGroup.java | 56 +++++ .../fluss/server/replica/ReplicaManager.java | 33 +-- .../server/replica/delay/DelayedFetchLog.java | 6 +- .../apache/fluss/server/tablet/TabletServer.java | 20 +- .../apache/fluss/server/tablet/TabletService.java | 1 - .../fluss/server/metrics/MetricManagerTest.java | 86 -------- .../fluss/server/metrics/UserMetricsTest.java | 126 +++++++++++ .../server/metrics/group/TestingMetricGroups.java | 38 +++- .../fluss/server/replica/ReplicaManagerTest.java | 17 +- .../fluss/server/replica/ReplicaTestBase.java | 1 + .../replica/fetcher/ReplicaFetcherThreadTest.java | 2 + .../maintenance/observability/monitor-metrics.md | 2 + 23 files changed, 536 insertions(+), 476 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 778394155..81984015c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1695,14 +1695,6 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9250-9260."); - public static final ConfigOption<Duration> METRICS_MANAGER_INACTIVE_EXPIRATION_TIME = - key("metrics.manager.inactive-expiration-time") - .durationType() - .defaultValue(Duration.ofHours(1)) - .withDescription( - "The time to wait an inactive metric to be expired." - + "This is not effective for permanent metric but for temporary metric. Mostly user level metric."); - // ------------------------------------------------------------------------ // ConfigOptions for jmx reporter // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 75c6acc05..b1326d46b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -77,8 +77,8 @@ public class MetricNames { // -------------------------------------------------------------------------------------------- // metrics for user // -------------------------------------------------------------------------------------------- - public static final String BYTES_IN_COUNT = "bytesInCount"; - public static final String BYTES_OUT_COUNT = "bytesOutCount"; + public static final String BYTES_IN = "bytesIn"; + public static final String BYTES_OUT = "bytesOut"; // -------------------------------------------------------------------------------------------- // metrics for table diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java b/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java index 4c9980c69..81ffd5769 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java @@ -73,9 +73,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { /** The registry that this metrics group belongs to. */ protected final MetricRegistry registry; - /** The last record time for the group. */ - protected volatile long lastRecordTime; - /** All metrics that are directly contained in this group. */ private final Map<String, Metric> metrics = new HashMap<>(); @@ -104,7 +101,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { this.scopeComponents = checkNotNull(scope); this.parent = parent; this.logicalScopeStrings = new String[registry.getNumberReporters()]; - this.lastRecordTime = System.currentTimeMillis(); } @Override @@ -199,10 +195,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { */ protected abstract String getGroupName(CharacterFilter filter); - public long getLastRecordTime() { - return lastRecordTime; - } - // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java index 617cb1ad8..1a70bcda4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java @@ -20,12 +20,14 @@ package org.apache.fluss.server.entity; import org.apache.fluss.security.acl.FlussPrincipal; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + /** The context information of user who writes or reads table. */ public class UserContext { private final FlussPrincipal principal; public UserContext(FlussPrincipal principal) { - this.principal = principal; + this.principal = checkNotNull(principal); } public FlussPrincipal getPrincipal() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java deleted file mode 100644 index ca7785d9e..000000000 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.metrics; - -import org.apache.fluss.annotation.VisibleForTesting; -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.metrics.groups.AbstractMetricGroup; -import org.apache.fluss.utils.MapUtils; -import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - -/** - * Designed to manage and clean up metrics which will be expired. To be more specific, we may have - * three kind of metrics: 1. Never expired metrics, such as "tableCount". It's value will change but - * the metric itself is persistent. 2. Expired but autonomous metrics, such as "bytesInPerSecond" - * for a table. Once the table was dropped, the metric will be deleted. So we don't need to worry - * about memory leak. 3. Expired and free metrics, mostly as a user level metric, such as - * "bytesInCount" or "bytesOutCount" for a user. Users may start and stop to write/read a table - * anytime so that we can't predict when to free the metric. In such scene, to avoid memory leak, we - * design MetricManager to clean up them periodically if a metric is inactive and reach an - * expiration time. - */ -public class MetricManager implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(MetricManager.class); - - private final long inactiveMetricExpirationTimeMs; - private final ScheduledExecutorService metricsScheduler; - private final ConcurrentMap<String, AbstractMetricGroup> metrics = - MapUtils.newConcurrentHashMap(); - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - public MetricManager(Configuration configuration) { - this.inactiveMetricExpirationTimeMs = - configuration - .get(ConfigOptions.METRICS_MANAGER_INACTIVE_EXPIRATION_TIME) - .toMillis(); - - this.metricsScheduler = - Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("periodic-metric-cleanup-manager")); - this.metricsScheduler.scheduleAtFixedRate( - new ExpiredMetricCleanupTask(), 30, 30, TimeUnit.SECONDS); - } - - @SuppressWarnings("unchecked") - public <T extends AbstractMetricGroup> T getOrCreateMetric( - String key, Function<String, T> mappingFunction) { - checkNotClosed(); - - return (T) metrics.computeIfAbsent(key, mappingFunction); - } - - @VisibleForTesting - public AbstractMetricGroup getMetric(String key) { - return metrics.get(key); - } - - public void removeMetric(String name) { - checkNotClosed(); - - AbstractMetricGroup metricGroup = metrics.remove(name); - if (metricGroup != null) { - metricGroup.close(); - } - } - - public boolean hasExpired(String metricName) { - return (System.currentTimeMillis() - metrics.get(metricName).getLastRecordTime()) - > this.inactiveMetricExpirationTimeMs; - } - - private void checkNotClosed() { - if (isClosed.get()) { - throw new IllegalStateException("MetricManager is already closed."); - } - } - - @Override - public void close() throws Exception { - if (isClosed.compareAndSet(false, true)) { - metricsScheduler.shutdownNow(); - } - } - - class ExpiredMetricCleanupTask implements Runnable { - @Override - public void run() { - for (Map.Entry<String, AbstractMetricGroup> metricEntry : metrics.entrySet()) { - String metricName = metricEntry.getKey(); - AbstractMetricGroup metricGroup = metricEntry.getValue(); - synchronized (metricGroup) { - if (hasExpired(metricName)) { - LOG.info("Removing expired metric {}", metricName); - removeMetric(metricName); - LOG.info("Remove expired metric {} success", metricName); - } - } - } - } - } -} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java index 9f0f5469d..770a90001 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java @@ -84,11 +84,9 @@ public class ServerMetricUtils { String clusterId, @Nullable String rack, String hostname, - int serverId, - MetricManager metricManager) { + int serverId) { TabletServerMetricGroup tabletServerMetricGroup = - new TabletServerMetricGroup( - registry, clusterId, rack, hostname, serverId, metricManager); + new TabletServerMetricGroup(registry, clusterId, rack, hostname, serverId); createAndInitializeStatusMetricGroup(tabletServerMetricGroup); return tabletServerMetricGroup; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java new file mode 100644 index 000000000..af925b58e --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metrics; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.groups.AbstractMetricGroup; +import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup; +import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; +import org.apache.fluss.server.metrics.group.UserMetricGroup; +import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.concurrent.Scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Designed to manage and cleanup user-level metrics (will be used for future quota management). + * + * <p>To be more specific, Fluss server maintains three kind of metrics: + * + * <ul> + * <li>1. Server-level metrics which is never expired or dropped, such as "tableCount". + * <li>2. Table-level metrics which is dropped when the table is deleted, such as + * "bytesInPerSecond" for a table. + * <li>3. User-level metrics which will be expired and dropped after a period of inactivity, such + * as "bytesInRate" for a user. + * </ul> + * + * This class mainly manages the user-level metrics. There are many a lot of users to read/write + * tables, but most of them are idle after a period of time. To avoid memory leak or GC overhead, we + * need to clean up those inactive user-level metrics periodically. + */ +public class UserMetrics implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(UserMetrics.class); + private static final long INACTIVE_METRIC_EXPIRATION_TIME_MS = 3600_000L; // 1 hour + private static final long METRICS_CLEANUP_INTERVAL_MS = 30_000L; // 30s + + private final long inactiveMetricExpirationTimeMs; + private final long cleanupIntervalMs; + + private final MetricRegistry metricRegistry; + private final TabletServerMetricGroup parentMetricGroup; + private final ScheduledFuture<?> schedule; + + private final ConcurrentMap<MetricKey, AbstractUserMetricGroup> metrics = + MapUtils.newConcurrentHashMap(); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + public UserMetrics( + Scheduler cleanupScheduler, + MetricRegistry metricRegistry, + TabletServerMetricGroup parentMetricGroup) { + this( + INACTIVE_METRIC_EXPIRATION_TIME_MS, + METRICS_CLEANUP_INTERVAL_MS, + cleanupScheduler, + metricRegistry, + parentMetricGroup); + } + + @VisibleForTesting + UserMetrics( + long inactiveMetricExpirationTimeMs, + long cleanupIntervalMs, + Scheduler cleanupScheduler, + MetricRegistry metricRegistry, + TabletServerMetricGroup parentMetricGroup) { + this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs; + this.cleanupIntervalMs = cleanupIntervalMs; + this.metricRegistry = metricRegistry; + this.parentMetricGroup = parentMetricGroup; + this.schedule = + cleanupScheduler.schedule( + "user-metrics-expired-cleanup-task", + new ExpiredMetricCleanupTask(), + cleanupIntervalMs, + cleanupIntervalMs); + } + + protected AbstractUserMetricGroup getOrCreateMetric(MetricKey metricKey) { + return metrics.computeIfAbsent( + metricKey, + key -> { + if (metricKey.tablePath != null) { + return new UserPerTableMetricGroup( + metricRegistry, + key.userName, + key.tablePath, + inactiveMetricExpirationTimeMs, + parentMetricGroup); + } else { + return new UserMetricGroup( + metricRegistry, + key.userName, + inactiveMetricExpirationTimeMs, + parentMetricGroup); + } + }); + } + + /** Increments the number of bytes written by a user on a specific table. */ + public void incBytesIn(@Nullable UserContext userContext, TablePath tablePath, long numBytes) { + incBytes(userContext, tablePath, numBytes, true); + } + + /** Increments the number of bytes read by a user on a specific table. */ + public void incBytesOut(@Nullable UserContext userContext, TablePath tablePath, long numBytes) { + incBytes(userContext, tablePath, numBytes, false); + } + + private void incBytes( + @Nullable UserContext userContext, + TablePath tablePath, + long numBytes, + boolean bytesIn) { + if (userContext == null + || userContext.getPrincipal() == FlussPrincipal.ANY + || userContext.getPrincipal() == FlussPrincipal.WILD_CARD_PRINCIPAL + || userContext.getPrincipal() == FlussPrincipal.ANONYMOUS) { + // Ignore null or anonymous or wildcard users + return; + } + String userName = userContext.getPrincipal().getName(); + AbstractUserMetricGroup user = getOrCreateMetric(new MetricKey(userName, null)); + AbstractUserMetricGroup perTable = getOrCreateMetric(new MetricKey(userName, tablePath)); + if (bytesIn) { + user.incBytesIn(numBytes); + perTable.incBytesIn(numBytes); + } else { + user.incBytesOut(numBytes); + perTable.incBytesOut(numBytes); + } + } + + @VisibleForTesting + int numMetrics() { + return metrics.size(); + } + + @Override + public void close() throws Exception { + if (isClosed.compareAndSet(false, true)) { + schedule.cancel(true); + for (AbstractMetricGroup metricGroup : metrics.values()) { + metricGroup.close(); + } + metrics.clear(); + } + } + + /** A periodic task to clean up expired user metrics. */ + private class ExpiredMetricCleanupTask implements Runnable { + + @Override + public void run() { + for (Map.Entry<MetricKey, AbstractUserMetricGroup> metricEntry : metrics.entrySet()) { + MetricKey metricName = metricEntry.getKey(); + AbstractUserMetricGroup userMetric = metricEntry.getValue(); + synchronized (userMetric) { + if (userMetric.hasExpired()) { + LOG.debug("Removing expired user metric [{}]", metricName); + metrics.remove(metricName); + userMetric.close(); + } + } + } + } + } + + protected static class MetricKey { + final String userName; + @Nullable final TablePath tablePath; + + MetricKey(String userName, @Nullable TablePath tablePath) { + this.userName = userName; + this.tablePath = tablePath; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + MetricKey metricKey = (MetricKey) o; + return Objects.equals(userName, metricKey.userName) + && Objects.equals(tablePath, metricKey.tablePath); + } + + @Override + public int hashCode() { + return Objects.hash(userName, tablePath); + } + + @Override + public String toString() { + if (tablePath == null) { + return userName; + } else { + return userName + ":" + tablePath; + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java similarity index 65% copy from fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java copy to fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java index 7995cbc50..b7745076a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java @@ -20,8 +20,6 @@ package org.apache.fluss.server.metrics.group; import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.metrics.CharacterFilter; import org.apache.fluss.metrics.Counter; -import org.apache.fluss.metrics.MeterView; -import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; @@ -29,33 +27,33 @@ import org.apache.fluss.metrics.registry.MetricRegistry; import java.util.Map; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; +import static org.apache.fluss.utils.Preconditions.checkNotNull; -/** Metrics for the users in server with {@link TabletServerMetricGroup} as parent group. */ -public class UserMetricGroup extends AbstractMetricGroup { +/** Abstract metric the users in server and tracks the expiration. */ +public abstract class AbstractUserMetricGroup extends AbstractMetricGroup { private static final String NAME = "user"; private final String principalName; + private final long inactiveMetricExpirationTimeMs; protected final Counter bytesIn; protected final Counter bytesOut; - public UserMetricGroup( + /** The last record time for the group. */ + protected volatile long lastRecordTime; + + public AbstractUserMetricGroup( MetricRegistry registry, String principalName, + long inactiveMetricExpirationTimeMs, TabletServerMetricGroup tabletServerMetricGroup) { super(registry, makeScope(tabletServerMetricGroup, principalName), tabletServerMetricGroup); - this.principalName = principalName; - - bytesIn = new ThreadSafeSimpleCounter(); - bytesOut = new ThreadSafeSimpleCounter(); - counter(MetricNames.BYTES_IN_COUNT, bytesIn); - meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn)); - counter(MetricNames.BYTES_OUT_COUNT, bytesOut); - meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut)); - } + this.principalName = checkNotNull(principalName); + this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs; - @VisibleForTesting - public String getPrincipalName() { - return principalName; + this.bytesIn = new ThreadSafeSimpleCounter(); + this.bytesOut = new ThreadSafeSimpleCounter(); + + this.lastRecordTime = System.currentTimeMillis(); } @Override @@ -63,18 +61,28 @@ public class UserMetricGroup extends AbstractMetricGroup { return NAME; } + @VisibleForTesting + public String getPrincipalName() { + return principalName; + } + @Override protected void putVariables(Map<String, String> variables) { variables.put("user", principalName); } - public void incBytesIn(long n) { + public void incBytesIn(long numBytes) { this.lastRecordTime = System.currentTimeMillis(); - bytesIn.inc(n); + bytesIn.inc(numBytes); } - public void incBytesOut(long n) { + public void incBytesOut(long numBytes) { this.lastRecordTime = System.currentTimeMillis(); - bytesOut.inc(n); + bytesOut.inc(numBytes); + } + + /** Return true if the metric is eligible for removal due to inactivity. false otherwise. */ + public boolean hasExpired() { + return (System.currentTimeMillis() - lastRecordTime) > this.inactiveMetricExpirationTimeMs; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index c4338ef31..7620bcbfd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -27,16 +27,11 @@ import org.apache.fluss.metrics.NoOpCounter; import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import org.apache.fluss.metrics.utils.MetricGroupUtils; -import org.apache.fluss.security.acl.FlussPrincipal; -import org.apache.fluss.server.entity.UserContext; -import org.apache.fluss.server.metrics.MetricManager; import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -48,8 +43,6 @@ public class TableMetricGroup extends AbstractMetricGroup { private final Map<TableBucket, BucketMetricGroup> buckets = new HashMap<>(); - private final MetricManager metricManager; - private final TablePath tablePath; // server-level metrics @@ -65,15 +58,14 @@ public class TableMetricGroup extends AbstractMetricGroup { MetricRegistry registry, TablePath tablePath, boolean isKvTable, - TabletServerMetricGroup serverMetricGroup, - MetricManager metricManager) { + TabletServerMetricGroup serverMetricGroup) { super( registry, makeScope(serverMetricGroup, tablePath.getDatabaseName(), tablePath.getTableName()), serverMetricGroup); this.serverMetrics = serverMetricGroup; this.tablePath = tablePath; - this.metricManager = metricManager; + // if is kv table, create kv metrics if (isKvTable) { kvMetrics = new KvMetricGroup(this); @@ -102,28 +94,14 @@ public class TableMetricGroup extends AbstractMetricGroup { serverMetrics.messageIn().inc(n); } - public void incLogBytesIn(long n, @Nullable UserContext userContext) { + public void incLogBytesIn(long n) { logMetrics.bytesIn.inc(n); - serverMetrics.incBytesIn(n, userContext); - - // table user level metric, only consider log table, exclude CDC log. - Optional.ofNullable(userContext) - .map(UserContext::getPrincipal) - .map(FlussPrincipal::getName) - .filter(name -> !name.isEmpty()) - .ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesIn(n)); + serverMetrics.bytesIn().inc(n); } - public void incLogBytesOut(long n, @Nullable UserContext userContext) { + public void incLogBytesOut(long n) { logMetrics.bytesOut.inc(n); - serverMetrics.incBytesOut(n, userContext); - - // user level metric - Optional.ofNullable(userContext) - .map(UserContext::getPrincipal) - .map(FlussPrincipal::getName) - .filter(name -> !name.isEmpty()) - .ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesOut(n)); + serverMetrics.bytesOut().inc(n); } public Counter totalFetchLogRequests() { @@ -244,52 +222,6 @@ public class TableMetricGroup extends AbstractMetricGroup { } } - // ------------------------------------------------------------------------ - // user groups - // ------------------------------------------------------------------------ - private TableUserMetricGroup getOrCreateUserMetricGroup(String principalName) { - String uniqueName = MetricGroupUtils.getScopeName(this, principalName); - return metricManager.getOrCreateMetric( - uniqueName, name -> new TableUserMetricGroup(this, principalName)); - } - - private static class TableUserMetricGroup extends AbstractMetricGroup { - private final String principalName; - protected final Counter bytesIn; - protected final Counter bytesOut; - - private TableUserMetricGroup(TableMetricGroup tableMetricGroup, String principalName) { - super( - tableMetricGroup.registry, - makeScope(tableMetricGroup, principalName), - tableMetricGroup); - this.principalName = principalName; - - bytesIn = counter(MetricNames.BYTES_IN_COUNT, new ThreadSafeSimpleCounter()); - bytesOut = counter(MetricNames.BYTES_OUT_COUNT, new ThreadSafeSimpleCounter()); - } - - @Override - protected String getGroupName(CharacterFilter filter) { - return "user"; - } - - @Override - protected void putVariables(Map<String, String> variables) { - variables.put("user", principalName); - } - - public void incBytesIn(long n) { - this.lastRecordTime = System.currentTimeMillis(); - bytesIn.inc(n); - } - - public void incBytesOut(long n) { - this.lastRecordTime = System.currentTimeMillis(); - bytesOut.inc(n); - } - } - // ------------------------------------------------------------------------ // bucket groups // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 8aa71b524..59730b69f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -30,16 +30,9 @@ import org.apache.fluss.metrics.SimpleCounter; import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import org.apache.fluss.metrics.utils.MetricGroupUtils; -import org.apache.fluss.security.acl.FlussPrincipal; -import org.apache.fluss.server.entity.UserContext; -import org.apache.fluss.server.metrics.MetricManager; import org.apache.fluss.utils.MapUtils; -import javax.annotation.Nullable; - import java.util.Map; -import java.util.Optional; /** The metric group for tablet server. */ public class TabletServerMetricGroup extends AbstractMetricGroup { @@ -50,8 +43,6 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private final Map<TablePath, TableMetricGroup> metricGroupByTable = MapUtils.newConcurrentHashMap(); - private final MetricManager metricManager; - protected final String clusterId; protected final String rack; protected final String hostname; @@ -85,18 +76,12 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private final Counter failedIsrUpdates; public TabletServerMetricGroup( - MetricRegistry registry, - String clusterId, - String rack, - String hostname, - int serverId, - MetricManager metricManager) { + MetricRegistry registry, String clusterId, String rack, String hostname, int serverId) { super(registry, new String[] {clusterId, hostname, NAME}, null); this.clusterId = clusterId; this.rack = rack; this.hostname = hostname; this.serverId = serverId; - this.metricManager = metricManager; replicationBytesIn = new ThreadSafeSimpleCounter(); meter(MetricNames.REPLICATION_IN_RATE, new MeterView(replicationBytesIn)); @@ -200,28 +185,6 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { return bytesOut; } - public void incBytesIn(long n, @Nullable UserContext userContext) { - bytesIn.inc(n); - - // user level metric - Optional.ofNullable(userContext) - .map(UserContext::getPrincipal) - .map(FlussPrincipal::getName) - .filter(name -> !name.isEmpty()) - .ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesIn(n)); - } - - public void incBytesOut(long n, @Nullable UserContext userContext) { - bytesOut.inc(n); - - // user level metric - Optional.ofNullable(userContext) - .map(UserContext::getPrincipal) - .map(FlussPrincipal::getName) - .filter(name -> !name.isEmpty()) - .ifPresent(name -> getOrCreateUserMetricGroup(name).incBytesOut(n)); - } - public Counter logFlushCount() { return logFlushCount; } @@ -267,9 +230,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { TableMetricGroup tableMetricGroup = metricGroupByTable.computeIfAbsent( tablePath, - table -> - new TableMetricGroup( - registry, tablePath, isKvTable, this, metricManager)); + table -> new TableMetricGroup(registry, tablePath, isKvTable, this)); return tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(), bucket); } @@ -288,13 +249,4 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { } } } - - // ------------------------------------------------------------------------ - // user metric groups - // ------------------------------------------------------------------------ - public UserMetricGroup getOrCreateUserMetricGroup(String principalName) { - String uniqueName = MetricGroupUtils.getScopeName(this, principalName); - return metricManager.getOrCreateMetric( - uniqueName, name -> new UserMetricGroup(registry, principalName, this)); - } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java index 7995cbc50..a23f11a3f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java @@ -17,64 +17,24 @@ package org.apache.fluss.server.metrics.group; -import org.apache.fluss.annotation.VisibleForTesting; -import org.apache.fluss.metrics.CharacterFilter; -import org.apache.fluss.metrics.Counter; import org.apache.fluss.metrics.MeterView; import org.apache.fluss.metrics.MetricNames; -import org.apache.fluss.metrics.ThreadSafeSimpleCounter; -import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import java.util.Map; - -import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; - -/** Metrics for the users in server with {@link TabletServerMetricGroup} as parent group. */ -public class UserMetricGroup extends AbstractMetricGroup { - private static final String NAME = "user"; - - private final String principalName; - protected final Counter bytesIn; - protected final Counter bytesOut; +/** + * Metrics for the overall user level in server with {@link TabletServerMetricGroup} as parent + * group. + */ +public class UserMetricGroup extends AbstractUserMetricGroup { public UserMetricGroup( MetricRegistry registry, String principalName, + long inactiveMetricExpirationTimeMs, TabletServerMetricGroup tabletServerMetricGroup) { - super(registry, makeScope(tabletServerMetricGroup, principalName), tabletServerMetricGroup); - this.principalName = principalName; + super(registry, principalName, inactiveMetricExpirationTimeMs, tabletServerMetricGroup); - bytesIn = new ThreadSafeSimpleCounter(); - bytesOut = new ThreadSafeSimpleCounter(); - counter(MetricNames.BYTES_IN_COUNT, bytesIn); meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn)); - counter(MetricNames.BYTES_OUT_COUNT, bytesOut); meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut)); } - - @VisibleForTesting - public String getPrincipalName() { - return principalName; - } - - @Override - protected String getGroupName(CharacterFilter filter) { - return NAME; - } - - @Override - protected void putVariables(Map<String, String> variables) { - variables.put("user", principalName); - } - - public void incBytesIn(long n) { - this.lastRecordTime = System.currentTimeMillis(); - bytesIn.inc(n); - } - - public void incBytesOut(long n) { - this.lastRecordTime = System.currentTimeMillis(); - bytesOut.inc(n); - } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java new file mode 100644 index 000000000..a43e63f9e --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metrics.group; + +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.metrics.registry.MetricRegistry; + +import java.util.Map; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Metrics for the users per table in server with {@link TabletServerMetricGroup} as parent group. + */ +public class UserPerTableMetricGroup extends AbstractUserMetricGroup { + + private final TablePath tablePath; + + public UserPerTableMetricGroup( + MetricRegistry registry, + String principalName, + TablePath tablePath, + long inactiveMetricExpirationTimeMs, + TabletServerMetricGroup tabletServerMetricGroup) { + super(registry, principalName, inactiveMetricExpirationTimeMs, tabletServerMetricGroup); + this.tablePath = checkNotNull(tablePath); + + // only track counters for per-table user metrics for billing purposes, + // the corresponding rates are tracked at the overall user level + counter(MetricNames.BYTES_IN, bytesIn); + counter(MetricNames.BYTES_OUT, bytesOut); + } + + @Override + protected void putVariables(Map<String, String> variables) { + super.putVariables(variables); + variables.put("database", tablePath.getDatabaseName()); + variables.put("table", tablePath.getTableName()); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 28d627d1f..53f8338af 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -85,6 +85,7 @@ import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.log.remote.RemoteLogManager; import org.apache.fluss.server.metadata.ClusterMetadata; import org.apache.fluss.server.metadata.TabletServerMetadataCache; +import org.apache.fluss.server.metrics.UserMetrics; import org.apache.fluss.server.metrics.group.BucketMetricGroup; import org.apache.fluss.server.metrics.group.TableMetricGroup; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; @@ -185,6 +186,7 @@ public class ReplicaManager { // for metrics private final TabletServerMetricGroup serverMetricGroup; + private final UserMetrics userMetrics; private final String internalListenerName; private final Clock clock; @@ -202,6 +204,7 @@ public class ReplicaManager { CompletedKvSnapshotCommitter completedKvSnapshotCommitter, FatalErrorHandler fatalErrorHandler, TabletServerMetricGroup serverMetricGroup, + UserMetrics userMetrics, Clock clock) throws IOException { this( @@ -217,6 +220,7 @@ public class ReplicaManager { completedKvSnapshotCommitter, fatalErrorHandler, serverMetricGroup, + userMetrics, new RemoteLogManager(conf, zkClient, coordinatorGateway, clock), clock); } @@ -235,6 +239,7 @@ public class ReplicaManager { CompletedKvSnapshotCommitter completedKvSnapshotCommitter, FatalErrorHandler fatalErrorHandler, TabletServerMetricGroup serverMetricGroup, + UserMetrics userMetrics, RemoteLogManager remoteLogManager, Clock clock) throws IOException { @@ -280,6 +285,7 @@ public class ReplicaManager { zkClient, completedKvSnapshotCommitter, kvSnapshotResource, conf); this.remoteLogManager = remoteLogManager; this.serverMetricGroup = serverMetricGroup; + this.userMetrics = userMetrics; this.clock = clock; registerMetrics(); } @@ -501,7 +507,6 @@ public class ReplicaManager { int requiredAcks, Map<TableBucket, KvRecordBatch> entriesPerBucket, @Nullable int[] targetColumns, - @Nullable UserContext userContext, Consumer<List<PutKvResultForBucket>> responseCallback) { if (isRequiredAcksInvalid(requiredAcks)) { throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); @@ -509,7 +514,7 @@ public class ReplicaManager { long startTime = System.currentTimeMillis(); Map<TableBucket, PutKvResultForBucket> kvPutResult = - putToLocalKv(entriesPerBucket, targetColumns, requiredAcks, userContext); + putToLocalKv(entriesPerBucket, targetColumns, requiredAcks); LOG.debug( "Put records to local kv storage and wait generate cdc log in {} ms", System.currentTimeMillis() - startTime); @@ -935,18 +940,21 @@ public class ReplicaManager { private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog( Map<TableBucket, MemoryLogRecords> entriesPerBucket, int requiredAcks, - UserContext userContext) { + @Nullable UserContext userContext) { Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new HashMap<>(); for (Map.Entry<TableBucket, MemoryLogRecords> entry : entriesPerBucket.entrySet()) { TableBucket tb = entry.getKey(); + MemoryLogRecords records = entry.getValue(); TableMetricGroup tableMetrics = null; try { Replica replica = getReplicaOrException(tb); tableMetrics = replica.tableMetrics(); tableMetrics.totalProduceLogRequests().inc(); + // record user metrics before appending to log, + // so that if appending fails, we still account the bytes. + userMetrics.incBytesIn(userContext, replica.getTablePath(), records.sizeInBytes()); LOG.trace("Append records to local log tablet for table bucket {}", tb); - LogAppendInfo appendInfo = - replica.appendRecordsToLeader(entry.getValue(), requiredAcks); + LogAppendInfo appendInfo = replica.appendRecordsToLeader(records, requiredAcks); long baseOffset = appendInfo.firstOffset(); LOG.trace( @@ -958,7 +966,7 @@ public class ReplicaManager { resultForBucketMap.put( tb, new ProduceLogResultForBucket(tb, baseOffset, appendInfo.lastOffset() + 1)); - tableMetrics.incLogBytesIn(appendInfo.validBytes(), userContext); + tableMetrics.incLogBytesIn(appendInfo.validBytes()); tableMetrics.incLogMessageIn(appendInfo.numMessages()); } catch (Exception e) { if (isUnexpectedException(e)) { @@ -981,8 +989,7 @@ public class ReplicaManager { private Map<TableBucket, PutKvResultForBucket> putToLocalKv( Map<TableBucket, KvRecordBatch> entriesPerBucket, @Nullable int[] targetColumns, - int requiredAcks, - UserContext userContext) { + int requiredAcks) { Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new HashMap<>(); for (Map.Entry<TableBucket, KvRecordBatch> entry : entriesPerBucket.entrySet()) { TableBucket tb = entry.getKey(); @@ -1005,9 +1012,8 @@ public class ReplicaManager { // metric for kv tableMetrics.incKvMessageIn(entry.getValue().getRecordCount()); tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes()); - // metric for cdc log of kv. - // We set "userContext" to null to avoid cdc log attributed to any user. - tableMetrics.incLogBytesIn(appendInfo.validBytes(), null); + // metric for cdc log of kv + tableMetrics.incLogBytesIn(appendInfo.validBytes()); tableMetrics.incLogMessageIn(appendInfo.numMessages()); } catch (Exception e) { if (isUnexpectedException(e)) { @@ -1063,7 +1069,7 @@ public class ReplicaManager { public Map<TableBucket, LogReadResult> readFromLog( FetchParams fetchParams, Map<TableBucket, FetchReqInfo> bucketFetchInfo, - UserContext userContext) { + @Nullable UserContext userContext) { Map<TableBucket, LogReadResult> logReadResult = new HashMap<>(); boolean isFromFollower = fetchParams.isFromFollower(); int limitBytes = fetchParams.maxFetchBytes(); @@ -1121,7 +1127,8 @@ public class ReplicaManager { if (isFromFollower) { serverMetricGroup.replicationBytesOut().inc(recordBatchSize); } else { - tableMetrics.incLogBytesOut(recordBatchSize, userContext); + tableMetrics.incLogBytesOut(recordBatchSize); + userMetrics.incBytesOut(userContext, replica.getTablePath(), recordBatchSize); } } catch (Exception e) { if (isUnexpectedException(e)) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java index 32394b13f..e17b7e1cc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java @@ -41,6 +41,8 @@ import org.apache.fluss.server.replica.ReplicaManager.LogReadResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -60,7 +62,7 @@ public class DelayedFetchLog extends DelayedOperation { private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap; private final Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback; private final TabletServerMetricGroup serverMetricGroup; - private final UserContext userContext; + @Nullable private final UserContext userContext; public DelayedFetchLog( FetchParams params, @@ -68,7 +70,7 @@ public class DelayedFetchLog extends DelayedOperation { Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap, Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback, TabletServerMetricGroup serverMetricGroup, - UserContext userContext) { + @Nullable UserContext userContext) { super(params.maxWaitMs()); this.params = params; this.replicaManager = replicaManager; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 6de0762da..a014633a9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -45,8 +45,8 @@ import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; import org.apache.fluss.server.metadata.TabletServerMetadataCache; -import org.apache.fluss.server.metrics.MetricManager; import org.apache.fluss.server.metrics.ServerMetricUtils; +import org.apache.fluss.server.metrics.UserMetrics; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.zk.ZooKeeperClient; @@ -124,7 +124,7 @@ public class TabletServer extends ServerBase { private MetricRegistry metricRegistry; @GuardedBy("lock") - private MetricManager metricManager; + private UserMetrics userMetrics; @GuardedBy("lock") private TabletServerMetricGroup tabletServerMetricGroup; @@ -190,8 +190,10 @@ public class TabletServer extends ServerBase { List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.TABLET_SERVER); + this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS)); + scheduler.startup(); + // for metrics - this.metricManager = new MetricManager(conf); this.metricRegistry = MetricRegistry.create(conf, pluginManager); this.tabletServerMetricGroup = ServerMetricUtils.createTabletServerGroup( @@ -199,8 +201,8 @@ public class TabletServer extends ServerBase { ServerMetricUtils.validateAndGetClusterId(conf), rack, endpoints.get(0).getHost(), - serverId, - metricManager); + serverId); + this.userMetrics = new UserMetrics(scheduler, metricRegistry, tabletServerMetricGroup); this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); @@ -214,9 +216,6 @@ public class TabletServer extends ServerBase { this.metadataCache = new TabletServerMetadataCache(metadataManager); - this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS)); - scheduler.startup(); - this.logManager = LogManager.create(conf, zkClient, scheduler, clock, tabletServerMetricGroup); logManager.startup(); @@ -255,6 +254,7 @@ public class TabletServer extends ServerBase { rpcClient, metadataCache, interListenerName), this, tabletServerMetricGroup, + userMetrics, clock); replicaManager.startup(); @@ -399,8 +399,8 @@ public class TabletServer extends ServerBase { clientMetricGroup.close(); } - if (metricManager != null) { - metricManager.close(); + if (userMetrics != null) { + userMetrics.close(); } // We must shut down the scheduler early because otherwise, the scheduler could diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index b9c88e36d..891ddb826 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -227,7 +227,6 @@ public final class TabletService extends RpcServiceBase implements TabletServerG request.getAcks(), putKvData, getTargetColumns(request), - new UserContext(currentSession().getPrincipal()), bucketResponse -> response.complete(makePutKvResponse(bucketResponse))); return response; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java deleted file mode 100644 index 81aebda2c..000000000 --- a/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.metrics; - -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.metrics.registry.NOPMetricRegistry; -import org.apache.fluss.server.metrics.group.TestingMetricGroups; -import org.apache.fluss.server.metrics.group.UserMetricGroup; - -import org.junit.jupiter.api.Test; - -import java.time.Duration; - -import static org.apache.fluss.metrics.utils.MetricGroupUtils.getScopeName; -import static org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS; -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link MetricManager}. */ -public class MetricManagerTest { - @Test - void testMetricManager() throws Exception { - Configuration conf = new Configuration(); - // set expiration time to 2s in test - conf.set(ConfigOptions.METRICS_MANAGER_INACTIVE_EXPIRATION_TIME, Duration.ofSeconds(2)); - MetricManager metricManager = new MetricManager(conf); - - UserMetricGroup userMetricGroup = TestingMetricGroups.USER_METRICS; - - MetricManager.ExpiredMetricCleanupTask metricCleanupTask = - metricManager.new ExpiredMetricCleanupTask(); - - // fresh lastRecordTime - userMetricGroup.incBytesIn(100); - String metricName = - getScopeName(userMetricGroup.getParent(), userMetricGroup.getPrincipalName()); - metricManager.getOrCreateMetric(metricName, name -> TestingMetricGroups.USER_METRICS); - - // metric successful created - assertThat(metricManager.getMetric(metricName)).isEqualTo(TestingMetricGroups.USER_METRICS); - - // sleep 1s and fresh it, metric should not be removed - Thread.sleep(1000); - userMetricGroup.incBytesIn(100); - metricCleanupTask.run(); - assertThat(metricManager.getMetric(metricName)).isNotNull(); - assertThat(userMetricGroup.isClosed()).isFalse(); - - // sleep 3s and metric should already be removed - Thread.sleep(3000); - metricCleanupTask.run(); - - // metric successful removed - assertThat(metricManager.getMetric(metricName)).isNull(); - assertThat(userMetricGroup.isClosed()).isTrue(); - - // the metric is created after removing. - UserMetricGroup newCreated = - metricManager.getOrCreateMetric( - metricName, - name -> - new UserMetricGroup( - NOPMetricRegistry.INSTANCE, - "user_abc", - TABLET_SERVER_METRICS)); - newCreated.incBytesIn(100); - - assertThat(metricManager.getMetric(metricName)).isEqualTo(newCreated); - assertThat(newCreated.isClosed()).isFalse(); - } -} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java new file mode 100644 index 000000000..c302eb331 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metrics; + +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.registry.NOPMetricRegistry; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link UserMetrics}. */ +public class UserMetricsTest { + + private FlussScheduler scheduler; + + @BeforeEach + void before() { + scheduler = new FlussScheduler(2, false); + scheduler.startup(); + } + + @AfterEach + void after() throws InterruptedException { + scheduler.shutdown(); + } + + @Test + void testMetricExpiration() { + // set expiration time to 2s in test, and 10ms check interval + UserMetrics userMetrics = + new UserMetrics( + Duration.ofSeconds(2).toMillis(), + 100L, + scheduler, + NOPMetricRegistry.INSTANCE, + TABLET_SERVER_METRICS); + + String user1 = "user1"; + String user2 = "user2"; + UserContext uc1 = new UserContext(new FlussPrincipal(user1, "USER")); + UserContext uc2 = new UserContext(new FlussPrincipal(user2, "USER")); + + TablePath t1 = TablePath.of("db1", "table1"); + TablePath t2 = TablePath.of("db1", "table2"); + + userMetrics.incBytesIn(null, t1, 100); + userMetrics.incBytesOut(null, t2, 200); + userMetrics.incBytesOut(new UserContext(FlussPrincipal.ANY), t1, 200); + userMetrics.incBytesIn(new UserContext(FlussPrincipal.ANONYMOUS), t1, 300); + userMetrics.incBytesOut(new UserContext(FlussPrincipal.WILD_CARD_PRINCIPAL), t2, 100); + + // ignore null/anonymous/wildcard user + assertThat(userMetrics.numMetrics()).isEqualTo(0); + + userMetrics.incBytesIn(uc1, t1, 100); + userMetrics.incBytesOut(uc1, t2, 100); + userMetrics.incBytesOut(uc2, t2, 200); + + // u1, u1+t1, u1+t2, u2, u2+t2 + assertThat(userMetrics.numMetrics()).isEqualTo(5); + + scheduler.schedule( + "updating-u1t1", + () -> { + userMetrics.incBytesIn(uc1, t1, 50); + }, + 0L, + 50L); + + AbstractUserMetricGroup u1t1 = + userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, t1)); + AbstractUserMetricGroup u1t2 = + userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, t2)); + AbstractUserMetricGroup u2t2 = + userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, t2)); + AbstractUserMetricGroup u1 = + userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, null)); + AbstractUserMetricGroup u2 = + userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, null)); + + // wait enough time for metrics expired + retry( + Duration.ofSeconds(10), + () -> { + // only u1 and u1+t1 are retained. + assertThat(userMetrics.numMetrics()).isEqualTo(2); + }); + + assertThat(u1t2.isClosed()).isTrue(); + assertThat(u2t2.isClosed()).isTrue(); + assertThat(u2.isClosed()).isTrue(); + + assertThat(u1.isClosed()).isFalse(); + assertThat(u1t1.isClosed()).isFalse(); + assertThat(userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, null))) + .isSameAs(u1); + assertThat(userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, t1))) + .isSameAs(u1t1); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java index 0e5a5fd09..b2e93f2bb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java @@ -19,13 +19,17 @@ package org.apache.fluss.server.metrics.group; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.NOPMetricRegistry; +import org.apache.fluss.server.metrics.UserMetrics; +import org.apache.fluss.testutils.common.ScheduledTask; +import org.apache.fluss.utils.concurrent.Scheduler; + +import java.util.concurrent.ScheduledFuture; /** Utilities for various metric groups for testing. */ public class TestingMetricGroups { public static final TabletServerMetricGroup TABLET_SERVER_METRICS = - new TabletServerMetricGroup( - NOPMetricRegistry.INSTANCE, "fluss", "host", "rack", 0, null); + new TabletServerMetricGroup(NOPMetricRegistry.INSTANCE, "fluss", "host", "rack", 0); public static final CoordinatorMetricGroup COORDINATOR_METRICS = new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", "host", "0"); @@ -35,12 +39,34 @@ public class TestingMetricGroups { NOPMetricRegistry.INSTANCE, TablePath.of("mydb", "mytable"), false, - TABLET_SERVER_METRICS, - null); + TABLET_SERVER_METRICS); public static final BucketMetricGroup BUCKET_METRICS = new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0, TABLE_METRICS); - public static final UserMetricGroup USER_METRICS = - new UserMetricGroup(NOPMetricRegistry.INSTANCE, "user_abc", TABLET_SERVER_METRICS); + public static final UserMetrics USER_METRICS = + new UserMetrics( + new TestingScheduler(), NOPMetricRegistry.INSTANCE, TABLET_SERVER_METRICS); + + // ------------------------------------------------------------------------------------------ + + private static class TestingScheduler implements Scheduler { + @Override + public void startup() { + // no-op + } + + @Override + public void shutdown() throws InterruptedException { + // no-op + } + + @Override + public ScheduledFuture<?> schedule( + String name, Runnable task, long delayMs, long periodMs) { + // Directly run the task for testing purpose. + task.run(); + return new ScheduledTask<>(() -> null, delayMs, periodMs); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index fe4eaebf8..7db23130f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -474,7 +474,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -487,7 +486,6 @@ class ReplicaManagerTest extends ReplicaTestBase { Collections.singletonMap( tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, (result) -> { // do nothing. })) @@ -502,7 +500,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(unknownTb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future::complete); assertThat(future.get()) .containsOnly( @@ -537,7 +534,6 @@ class ReplicaManagerTest extends ReplicaTestBase { genKvRecordBatchWithWriterId( data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 0)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5)); @@ -582,7 +578,6 @@ class ReplicaManagerTest extends ReplicaTestBase { genKvRecordBatchWithWriterId( data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 3)), null, - null, future::complete); PutKvResultForBucket putKvResultForBucket = future.get().get(0); assertThat(putKvResultForBucket.getErrorCode()) @@ -623,7 +618,6 @@ class ReplicaManagerTest extends ReplicaTestBase { genKvRecordBatchWithWriterId( data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 1)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -672,7 +666,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(deleteList)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, i + 1)); } @@ -684,7 +677,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 18)); @@ -745,7 +737,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future1::complete); assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -816,7 +807,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(keyType, rowType, data1)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 4)); // second prefix lookup in table, prefix key = (1, "a"). @@ -895,7 +885,6 @@ class ReplicaManagerTest extends ReplicaTestBase { 1, Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future1::complete); assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -1136,7 +1125,6 @@ class ReplicaManagerTest extends ReplicaTestBase { -1, Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), null, - null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); } @@ -1305,7 +1293,6 @@ class ReplicaManagerTest extends ReplicaTestBase { Collections.singletonList( Tuple2.of(key, value)))), null, - null, future::complete); } catch (Exception e) { throw new RuntimeException(e); @@ -1386,7 +1373,6 @@ class ReplicaManagerTest extends ReplicaTestBase { -1, entriesPerBucket, null, - null, writeResultForBuckets -> { // do nothing }); @@ -1425,7 +1411,6 @@ class ReplicaManagerTest extends ReplicaTestBase { -1, entriesPerBucket, null, - null, writeResultForBuckets -> { // do nothing }); @@ -1838,7 +1823,7 @@ class ReplicaManagerTest extends ReplicaTestBase { CompletableFuture<List<PutKvResultForBucket>> writeFuture = new CompletableFuture<>(); // put kv record batch for every bucket replicaManager.putRecordsToKv( - 300000, -1, kvRecordBatchPerBucket, null, null, writeFuture::complete); + 300000, -1, kvRecordBatchPerBucket, null, writeFuture::complete); // wait the write ack writeFuture.get(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index ae7e14a87..a41e922bc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -301,6 +301,7 @@ public class ReplicaTestBase { snapshotReporter, NOPErrorHandler.INSTANCE, TestingMetricGroups.TABLET_SERVER_METRICS, + TestingMetricGroups.USER_METRICS, remoteLogManager, manualClock); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index ff242238b..e512f4ae0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -73,6 +73,7 @@ import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; +import static org.apache.fluss.server.metrics.group.TestingMetricGroups.USER_METRICS; import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH; import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH; import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; @@ -464,6 +465,7 @@ public class ReplicaFetcherThreadTest { new TestingCompletedKvSnapshotCommitter(), NOPErrorHandler.INSTANCE, serverMetricGroup, + USER_METRICS, clock); } diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index b31f3cd3d..d731d7708 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -530,6 +530,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM </tr> <tr> <td rowspan="4">user</td> + // 不需要 <td>bytesInCount</td> <td>The total number of bytes written to this server labeled with <code>user</code>. </td> <td>Counter</td> @@ -540,6 +541,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM <td>Meter</td> </tr> <tr> + // 不需要 <td>bytesOutCount</td> <td>The total number of bytes read from this server labeled with <code>user</code>.</td> <td>Counter</td>
