This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-2080
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 5535697d404820a0265a014c76e884352b846df5
Author: Jark Wu <[email protected]>
AuthorDate: Fri Dec 19 19:19:05 2025 +0800

    WIP
---
 .../org/apache/fluss/config/ConfigOptions.java     |   8 -
 .../java/org/apache/fluss/metrics/MetricNames.java |   4 +-
 .../fluss/metrics/groups/AbstractMetricGroup.java  |   8 -
 .../apache/fluss/server/entity/UserContext.java    |   4 +-
 .../apache/fluss/server/metrics/MetricManager.java | 127 -----------
 .../fluss/server/metrics/ServerMetricUtils.java    |   6 +-
 .../apache/fluss/server/metrics/UserMetrics.java   | 231 +++++++++++++++++++++
 ...tricGroup.java => AbstractUserMetricGroup.java} |  50 +++--
 .../server/metrics/group/TableMetricGroup.java     |  80 +------
 .../metrics/group/TabletServerMetricGroup.java     |  52 +----
 .../server/metrics/group/UserMetricGroup.java      |  54 +----
 .../metrics/group/UserPerTableMetricGroup.java     |  56 +++++
 .../fluss/server/replica/ReplicaManager.java       |  33 +--
 .../server/replica/delay/DelayedFetchLog.java      |   6 +-
 .../apache/fluss/server/tablet/TabletServer.java   |  20 +-
 .../apache/fluss/server/tablet/TabletService.java  |   1 -
 .../fluss/server/metrics/MetricManagerTest.java    |  86 --------
 .../fluss/server/metrics/UserMetricsTest.java      | 126 +++++++++++
 .../server/metrics/group/TestingMetricGroups.java  |  38 +++-
 .../fluss/server/replica/ReplicaManagerTest.java   |  17 +-
 .../fluss/server/replica/ReplicaTestBase.java      |   1 +
 .../replica/fetcher/ReplicaFetcherThreadTest.java  |   2 +
 .../maintenance/observability/monitor-metrics.md   |   2 +
 23 files changed, 536 insertions(+), 476 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 778394155..81984015c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1695,14 +1695,6 @@ public class ConfigOptions {
                                     + "the CoordinatorServer) it is advisable 
to use a port range "
                                     + "like 9250-9260.");
 
-    public static final ConfigOption<Duration> 
METRICS_MANAGER_INACTIVE_EXPIRATION_TIME =
-            key("metrics.manager.inactive-expiration-time")
-                    .durationType()
-                    .defaultValue(Duration.ofHours(1))
-                    .withDescription(
-                            "The time to wait an inactive metric to be 
expired."
-                                    + "This is not effective for permanent 
metric but for temporary metric. Mostly user level metric.");
-
     // ------------------------------------------------------------------------
     //  ConfigOptions for jmx reporter
     // ------------------------------------------------------------------------
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 75c6acc05..b1326d46b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -77,8 +77,8 @@ public class MetricNames {
     // 
--------------------------------------------------------------------------------------------
     // metrics for user
     // 
--------------------------------------------------------------------------------------------
-    public static final String BYTES_IN_COUNT = "bytesInCount";
-    public static final String BYTES_OUT_COUNT = "bytesOutCount";
+    public static final String BYTES_IN = "bytesIn";
+    public static final String BYTES_OUT = "bytesOut";
 
     // 
--------------------------------------------------------------------------------------------
     // metrics for table
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java
 
b/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java
index 4c9980c69..81ffd5769 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/metrics/groups/AbstractMetricGroup.java
@@ -73,9 +73,6 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
     /** The registry that this metrics group belongs to. */
     protected final MetricRegistry registry;
 
-    /** The last record time for the group. */
-    protected volatile long lastRecordTime;
-
     /** All metrics that are directly contained in this group. */
     private final Map<String, Metric> metrics = new HashMap<>();
 
@@ -104,7 +101,6 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
         this.scopeComponents = checkNotNull(scope);
         this.parent = parent;
         this.logicalScopeStrings = new String[registry.getNumberReporters()];
-        this.lastRecordTime = System.currentTimeMillis();
     }
 
     @Override
@@ -199,10 +195,6 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
      */
     protected abstract String getGroupName(CharacterFilter filter);
 
-    public long getLastRecordTime() {
-        return lastRecordTime;
-    }
-
     // ------------------------------------------------------------------------
     //  Closing
     // ------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
index 617cb1ad8..1a70bcda4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
@@ -20,12 +20,14 @@ package org.apache.fluss.server.entity;
 
 import org.apache.fluss.security.acl.FlussPrincipal;
 
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
 /** The context information of user who writes or reads table. */
 public class UserContext {
     private final FlussPrincipal principal;
 
     public UserContext(FlussPrincipal principal) {
-        this.principal = principal;
+        this.principal = checkNotNull(principal);
     }
 
     public FlussPrincipal getPrincipal() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java
deleted file mode 100644
index ca7785d9e..000000000
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/MetricManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.metrics;
-
-import org.apache.fluss.annotation.VisibleForTesting;
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metrics.groups.AbstractMetricGroup;
-import org.apache.fluss.utils.MapUtils;
-import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-/**
- * Designed to manage and clean up metrics which will be expired. To be more 
specific, we may have
- * three kind of metrics: 1. Never expired metrics, such as "tableCount". It's 
value will change but
- * the metric itself is persistent. 2. Expired but autonomous metrics, such as 
"bytesInPerSecond"
- * for a table. Once the table was dropped, the metric will be deleted. So we 
don't need to worry
- * about memory leak. 3. Expired and free metrics, mostly as a user level 
metric, such as
- * "bytesInCount" or "bytesOutCount" for a user. Users may start and stop to 
write/read a table
- * anytime so that we can't predict when to free the metric. In such scene, to 
avoid memory leak, we
- * design MetricManager to clean up them periodically if a metric is inactive 
and reach an
- * expiration time.
- */
-public class MetricManager implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MetricManager.class);
-
-    private final long inactiveMetricExpirationTimeMs;
-    private final ScheduledExecutorService metricsScheduler;
-    private final ConcurrentMap<String, AbstractMetricGroup> metrics =
-            MapUtils.newConcurrentHashMap();
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-    public MetricManager(Configuration configuration) {
-        this.inactiveMetricExpirationTimeMs =
-                configuration
-                        
.get(ConfigOptions.METRICS_MANAGER_INACTIVE_EXPIRATION_TIME)
-                        .toMillis();
-
-        this.metricsScheduler =
-                Executors.newScheduledThreadPool(
-                        1, new 
ExecutorThreadFactory("periodic-metric-cleanup-manager"));
-        this.metricsScheduler.scheduleAtFixedRate(
-                new ExpiredMetricCleanupTask(), 30, 30, TimeUnit.SECONDS);
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T extends AbstractMetricGroup> T getOrCreateMetric(
-            String key, Function<String, T> mappingFunction) {
-        checkNotClosed();
-
-        return (T) metrics.computeIfAbsent(key, mappingFunction);
-    }
-
-    @VisibleForTesting
-    public AbstractMetricGroup getMetric(String key) {
-        return metrics.get(key);
-    }
-
-    public void removeMetric(String name) {
-        checkNotClosed();
-
-        AbstractMetricGroup metricGroup = metrics.remove(name);
-        if (metricGroup != null) {
-            metricGroup.close();
-        }
-    }
-
-    public boolean hasExpired(String metricName) {
-        return (System.currentTimeMillis() - 
metrics.get(metricName).getLastRecordTime())
-                > this.inactiveMetricExpirationTimeMs;
-    }
-
-    private void checkNotClosed() {
-        if (isClosed.get()) {
-            throw new IllegalStateException("MetricManager is already 
closed.");
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (isClosed.compareAndSet(false, true)) {
-            metricsScheduler.shutdownNow();
-        }
-    }
-
-    class ExpiredMetricCleanupTask implements Runnable {
-        @Override
-        public void run() {
-            for (Map.Entry<String, AbstractMetricGroup> metricEntry : 
metrics.entrySet()) {
-                String metricName = metricEntry.getKey();
-                AbstractMetricGroup metricGroup = metricEntry.getValue();
-                synchronized (metricGroup) {
-                    if (hasExpired(metricName)) {
-                        LOG.info("Removing expired metric {}", metricName);
-                        removeMetric(metricName);
-                        LOG.info("Remove expired metric {} success", 
metricName);
-                    }
-                }
-            }
-        }
-    }
-}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java
index 9f0f5469d..770a90001 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java
@@ -84,11 +84,9 @@ public class ServerMetricUtils {
             String clusterId,
             @Nullable String rack,
             String hostname,
-            int serverId,
-            MetricManager metricManager) {
+            int serverId) {
         TabletServerMetricGroup tabletServerMetricGroup =
-                new TabletServerMetricGroup(
-                        registry, clusterId, rack, hostname, serverId, 
metricManager);
+                new TabletServerMetricGroup(registry, clusterId, rack, 
hostname, serverId);
         createAndInitializeStatusMetricGroup(tabletServerMetricGroup);
         return tabletServerMetricGroup;
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
new file mode 100644
index 000000000..af925b58e
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
+import org.apache.fluss.server.metrics.group.UserMetricGroup;
+import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Designed to manage and cleanup user-level metrics (will be used for future 
quota management).
+ *
+ * <p>To be more specific, Fluss server maintains three kind of metrics:
+ *
+ * <ul>
+ *   <li>1. Server-level metrics which is never expired or dropped, such as 
"tableCount".
+ *   <li>2. Table-level metrics which is dropped when the table is deleted, 
such as
+ *       "bytesInPerSecond" for a table.
+ *   <li>3. User-level metrics which will be expired and dropped after a 
period of inactivity, such
+ *       as "bytesInRate" for a user.
+ * </ul>
+ *
+ * This class mainly manages the user-level metrics. There are many a lot of 
users to read/write
+ * tables, but most of them are idle after a period of time. To avoid memory 
leak or GC overhead, we
+ * need to clean up those inactive user-level metrics periodically.
+ */
+public class UserMetrics implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UserMetrics.class);
+    private static final long INACTIVE_METRIC_EXPIRATION_TIME_MS = 3600_000L; 
// 1 hour
+    private static final long METRICS_CLEANUP_INTERVAL_MS = 30_000L; // 30s
+
+    private final long inactiveMetricExpirationTimeMs;
+    private final long cleanupIntervalMs;
+
+    private final MetricRegistry metricRegistry;
+    private final TabletServerMetricGroup parentMetricGroup;
+    private final ScheduledFuture<?> schedule;
+
+    private final ConcurrentMap<MetricKey, AbstractUserMetricGroup> metrics =
+            MapUtils.newConcurrentHashMap();
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    public UserMetrics(
+            Scheduler cleanupScheduler,
+            MetricRegistry metricRegistry,
+            TabletServerMetricGroup parentMetricGroup) {
+        this(
+                INACTIVE_METRIC_EXPIRATION_TIME_MS,
+                METRICS_CLEANUP_INTERVAL_MS,
+                cleanupScheduler,
+                metricRegistry,
+                parentMetricGroup);
+    }
+
+    @VisibleForTesting
+    UserMetrics(
+            long inactiveMetricExpirationTimeMs,
+            long cleanupIntervalMs,
+            Scheduler cleanupScheduler,
+            MetricRegistry metricRegistry,
+            TabletServerMetricGroup parentMetricGroup) {
+        this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
+        this.cleanupIntervalMs = cleanupIntervalMs;
+        this.metricRegistry = metricRegistry;
+        this.parentMetricGroup = parentMetricGroup;
+        this.schedule =
+                cleanupScheduler.schedule(
+                        "user-metrics-expired-cleanup-task",
+                        new ExpiredMetricCleanupTask(),
+                        cleanupIntervalMs,
+                        cleanupIntervalMs);
+    }
+
+    protected AbstractUserMetricGroup getOrCreateMetric(MetricKey metricKey) {
+        return metrics.computeIfAbsent(
+                metricKey,
+                key -> {
+                    if (metricKey.tablePath != null) {
+                        return new UserPerTableMetricGroup(
+                                metricRegistry,
+                                key.userName,
+                                key.tablePath,
+                                inactiveMetricExpirationTimeMs,
+                                parentMetricGroup);
+                    } else {
+                        return new UserMetricGroup(
+                                metricRegistry,
+                                key.userName,
+                                inactiveMetricExpirationTimeMs,
+                                parentMetricGroup);
+                    }
+                });
+    }
+
+    /** Increments the number of bytes written by a user on a specific table. 
*/
+    public void incBytesIn(@Nullable UserContext userContext, TablePath 
tablePath, long numBytes) {
+        incBytes(userContext, tablePath, numBytes, true);
+    }
+
+    /** Increments the number of bytes read by a user on a specific table. */
+    public void incBytesOut(@Nullable UserContext userContext, TablePath 
tablePath, long numBytes) {
+        incBytes(userContext, tablePath, numBytes, false);
+    }
+
+    private void incBytes(
+            @Nullable UserContext userContext,
+            TablePath tablePath,
+            long numBytes,
+            boolean bytesIn) {
+        if (userContext == null
+                || userContext.getPrincipal() == FlussPrincipal.ANY
+                || userContext.getPrincipal() == 
FlussPrincipal.WILD_CARD_PRINCIPAL
+                || userContext.getPrincipal() == FlussPrincipal.ANONYMOUS) {
+            // Ignore null or anonymous or wildcard users
+            return;
+        }
+        String userName = userContext.getPrincipal().getName();
+        AbstractUserMetricGroup user = getOrCreateMetric(new 
MetricKey(userName, null));
+        AbstractUserMetricGroup perTable = getOrCreateMetric(new 
MetricKey(userName, tablePath));
+        if (bytesIn) {
+            user.incBytesIn(numBytes);
+            perTable.incBytesIn(numBytes);
+        } else {
+            user.incBytesOut(numBytes);
+            perTable.incBytesOut(numBytes);
+        }
+    }
+
+    @VisibleForTesting
+    int numMetrics() {
+        return metrics.size();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (isClosed.compareAndSet(false, true)) {
+            schedule.cancel(true);
+            for (AbstractMetricGroup metricGroup : metrics.values()) {
+                metricGroup.close();
+            }
+            metrics.clear();
+        }
+    }
+
+    /** A periodic task to clean up expired user metrics. */
+    private class ExpiredMetricCleanupTask implements Runnable {
+
+        @Override
+        public void run() {
+            for (Map.Entry<MetricKey, AbstractUserMetricGroup> metricEntry : 
metrics.entrySet()) {
+                MetricKey metricName = metricEntry.getKey();
+                AbstractUserMetricGroup userMetric = metricEntry.getValue();
+                synchronized (userMetric) {
+                    if (userMetric.hasExpired()) {
+                        LOG.debug("Removing expired user metric [{}]", 
metricName);
+                        metrics.remove(metricName);
+                        userMetric.close();
+                    }
+                }
+            }
+        }
+    }
+
+    protected static class MetricKey {
+        final String userName;
+        @Nullable final TablePath tablePath;
+
+        MetricKey(String userName, @Nullable TablePath tablePath) {
+            this.userName = userName;
+            this.tablePath = tablePath;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            MetricKey metricKey = (MetricKey) o;
+            return Objects.equals(userName, metricKey.userName)
+                    && Objects.equals(tablePath, metricKey.tablePath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(userName, tablePath);
+        }
+
+        @Override
+        public String toString() {
+            if (tablePath == null) {
+                return userName;
+            } else {
+                return userName + ":" + tablePath;
+            }
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
similarity index 65%
copy from 
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
copy to 
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
index 7995cbc50..b7745076a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
@@ -20,8 +20,6 @@ package org.apache.fluss.server.metrics.group;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -29,33 +27,33 @@ import org.apache.fluss.metrics.registry.MetricRegistry;
 import java.util.Map;
 
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
-/** Metrics for the users in server with {@link TabletServerMetricGroup} as 
parent group. */
-public class UserMetricGroup extends AbstractMetricGroup {
+/** Abstract metric the users in server and tracks the expiration. */
+public abstract class AbstractUserMetricGroup extends AbstractMetricGroup {
     private static final String NAME = "user";
 
     private final String principalName;
+    private final long inactiveMetricExpirationTimeMs;
     protected final Counter bytesIn;
     protected final Counter bytesOut;
 
-    public UserMetricGroup(
+    /** The last record time for the group. */
+    protected volatile long lastRecordTime;
+
+    public AbstractUserMetricGroup(
             MetricRegistry registry,
             String principalName,
+            long inactiveMetricExpirationTimeMs,
             TabletServerMetricGroup tabletServerMetricGroup) {
         super(registry, makeScope(tabletServerMetricGroup, principalName), 
tabletServerMetricGroup);
-        this.principalName = principalName;
-
-        bytesIn = new ThreadSafeSimpleCounter();
-        bytesOut = new ThreadSafeSimpleCounter();
-        counter(MetricNames.BYTES_IN_COUNT, bytesIn);
-        meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
-        counter(MetricNames.BYTES_OUT_COUNT, bytesOut);
-        meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
-    }
+        this.principalName = checkNotNull(principalName);
+        this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
 
-    @VisibleForTesting
-    public String getPrincipalName() {
-        return principalName;
+        this.bytesIn = new ThreadSafeSimpleCounter();
+        this.bytesOut = new ThreadSafeSimpleCounter();
+
+        this.lastRecordTime = System.currentTimeMillis();
     }
 
     @Override
@@ -63,18 +61,28 @@ public class UserMetricGroup extends AbstractMetricGroup {
         return NAME;
     }
 
+    @VisibleForTesting
+    public String getPrincipalName() {
+        return principalName;
+    }
+
     @Override
     protected void putVariables(Map<String, String> variables) {
         variables.put("user", principalName);
     }
 
-    public void incBytesIn(long n) {
+    public void incBytesIn(long numBytes) {
         this.lastRecordTime = System.currentTimeMillis();
-        bytesIn.inc(n);
+        bytesIn.inc(numBytes);
     }
 
-    public void incBytesOut(long n) {
+    public void incBytesOut(long numBytes) {
         this.lastRecordTime = System.currentTimeMillis();
-        bytesOut.inc(n);
+        bytesOut.inc(numBytes);
+    }
+
+    /** Return true if the metric is eligible for removal due to inactivity. 
false otherwise. */
+    public boolean hasExpired() {
+        return (System.currentTimeMillis() - lastRecordTime) > 
this.inactiveMetricExpirationTimeMs;
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
index c4338ef31..7620bcbfd 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
@@ -27,16 +27,11 @@ import org.apache.fluss.metrics.NoOpCounter;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
-import org.apache.fluss.metrics.utils.MetricGroupUtils;
-import org.apache.fluss.security.acl.FlussPrincipal;
-import org.apache.fluss.server.entity.UserContext;
-import org.apache.fluss.server.metrics.MetricManager;
 
 import javax.annotation.Nullable;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
@@ -48,8 +43,6 @@ public class TableMetricGroup extends AbstractMetricGroup {
 
     private final Map<TableBucket, BucketMetricGroup> buckets = new 
HashMap<>();
 
-    private final MetricManager metricManager;
-
     private final TablePath tablePath;
 
     // server-level metrics
@@ -65,15 +58,14 @@ public class TableMetricGroup extends AbstractMetricGroup {
             MetricRegistry registry,
             TablePath tablePath,
             boolean isKvTable,
-            TabletServerMetricGroup serverMetricGroup,
-            MetricManager metricManager) {
+            TabletServerMetricGroup serverMetricGroup) {
         super(
                 registry,
                 makeScope(serverMetricGroup, tablePath.getDatabaseName(), 
tablePath.getTableName()),
                 serverMetricGroup);
         this.serverMetrics = serverMetricGroup;
         this.tablePath = tablePath;
-        this.metricManager = metricManager;
+
         // if is kv table, create kv metrics
         if (isKvTable) {
             kvMetrics = new KvMetricGroup(this);
@@ -102,28 +94,14 @@ public class TableMetricGroup extends AbstractMetricGroup {
         serverMetrics.messageIn().inc(n);
     }
 
-    public void incLogBytesIn(long n, @Nullable UserContext userContext) {
+    public void incLogBytesIn(long n) {
         logMetrics.bytesIn.inc(n);
-        serverMetrics.incBytesIn(n, userContext);
-
-        // table user level metric, only consider log table, exclude CDC log.
-        Optional.ofNullable(userContext)
-                .map(UserContext::getPrincipal)
-                .map(FlussPrincipal::getName)
-                .filter(name -> !name.isEmpty())
-                .ifPresent(name -> 
getOrCreateUserMetricGroup(name).incBytesIn(n));
+        serverMetrics.bytesIn().inc(n);
     }
 
-    public void incLogBytesOut(long n, @Nullable UserContext userContext) {
+    public void incLogBytesOut(long n) {
         logMetrics.bytesOut.inc(n);
-        serverMetrics.incBytesOut(n, userContext);
-
-        // user level metric
-        Optional.ofNullable(userContext)
-                .map(UserContext::getPrincipal)
-                .map(FlussPrincipal::getName)
-                .filter(name -> !name.isEmpty())
-                .ifPresent(name -> 
getOrCreateUserMetricGroup(name).incBytesOut(n));
+        serverMetrics.bytesOut().inc(n);
     }
 
     public Counter totalFetchLogRequests() {
@@ -244,52 +222,6 @@ public class TableMetricGroup extends AbstractMetricGroup {
         }
     }
 
-    // ------------------------------------------------------------------------
-    //  user groups
-    // ------------------------------------------------------------------------
-    private TableUserMetricGroup getOrCreateUserMetricGroup(String 
principalName) {
-        String uniqueName = MetricGroupUtils.getScopeName(this, principalName);
-        return metricManager.getOrCreateMetric(
-                uniqueName, name -> new TableUserMetricGroup(this, 
principalName));
-    }
-
-    private static class TableUserMetricGroup extends AbstractMetricGroup {
-        private final String principalName;
-        protected final Counter bytesIn;
-        protected final Counter bytesOut;
-
-        private TableUserMetricGroup(TableMetricGroup tableMetricGroup, String 
principalName) {
-            super(
-                    tableMetricGroup.registry,
-                    makeScope(tableMetricGroup, principalName),
-                    tableMetricGroup);
-            this.principalName = principalName;
-
-            bytesIn = counter(MetricNames.BYTES_IN_COUNT, new 
ThreadSafeSimpleCounter());
-            bytesOut = counter(MetricNames.BYTES_OUT_COUNT, new 
ThreadSafeSimpleCounter());
-        }
-
-        @Override
-        protected String getGroupName(CharacterFilter filter) {
-            return "user";
-        }
-
-        @Override
-        protected void putVariables(Map<String, String> variables) {
-            variables.put("user", principalName);
-        }
-
-        public void incBytesIn(long n) {
-            this.lastRecordTime = System.currentTimeMillis();
-            bytesIn.inc(n);
-        }
-
-        public void incBytesOut(long n) {
-            this.lastRecordTime = System.currentTimeMillis();
-            bytesOut.inc(n);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  bucket groups
     // ------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
index 8aa71b524..59730b69f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
@@ -30,16 +30,9 @@ import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
-import org.apache.fluss.metrics.utils.MetricGroupUtils;
-import org.apache.fluss.security.acl.FlussPrincipal;
-import org.apache.fluss.server.entity.UserContext;
-import org.apache.fluss.server.metrics.MetricManager;
 import org.apache.fluss.utils.MapUtils;
 
-import javax.annotation.Nullable;
-
 import java.util.Map;
-import java.util.Optional;
 
 /** The metric group for tablet server. */
 public class TabletServerMetricGroup extends AbstractMetricGroup {
@@ -50,8 +43,6 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
     private final Map<TablePath, TableMetricGroup> metricGroupByTable =
             MapUtils.newConcurrentHashMap();
 
-    private final MetricManager metricManager;
-
     protected final String clusterId;
     protected final String rack;
     protected final String hostname;
@@ -85,18 +76,12 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
     private final Counter failedIsrUpdates;
 
     public TabletServerMetricGroup(
-            MetricRegistry registry,
-            String clusterId,
-            String rack,
-            String hostname,
-            int serverId,
-            MetricManager metricManager) {
+            MetricRegistry registry, String clusterId, String rack, String 
hostname, int serverId) {
         super(registry, new String[] {clusterId, hostname, NAME}, null);
         this.clusterId = clusterId;
         this.rack = rack;
         this.hostname = hostname;
         this.serverId = serverId;
-        this.metricManager = metricManager;
 
         replicationBytesIn = new ThreadSafeSimpleCounter();
         meter(MetricNames.REPLICATION_IN_RATE, new 
MeterView(replicationBytesIn));
@@ -200,28 +185,6 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
         return bytesOut;
     }
 
-    public void incBytesIn(long n, @Nullable UserContext userContext) {
-        bytesIn.inc(n);
-
-        // user level metric
-        Optional.ofNullable(userContext)
-                .map(UserContext::getPrincipal)
-                .map(FlussPrincipal::getName)
-                .filter(name -> !name.isEmpty())
-                .ifPresent(name -> 
getOrCreateUserMetricGroup(name).incBytesIn(n));
-    }
-
-    public void incBytesOut(long n, @Nullable UserContext userContext) {
-        bytesOut.inc(n);
-
-        // user level metric
-        Optional.ofNullable(userContext)
-                .map(UserContext::getPrincipal)
-                .map(FlussPrincipal::getName)
-                .filter(name -> !name.isEmpty())
-                .ifPresent(name -> 
getOrCreateUserMetricGroup(name).incBytesOut(n));
-    }
-
     public Counter logFlushCount() {
         return logFlushCount;
     }
@@ -267,9 +230,7 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
         TableMetricGroup tableMetricGroup =
                 metricGroupByTable.computeIfAbsent(
                         tablePath,
-                        table ->
-                                new TableMetricGroup(
-                                        registry, tablePath, isKvTable, this, 
metricManager));
+                        table -> new TableMetricGroup(registry, tablePath, 
isKvTable, this));
         return 
tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(), 
bucket);
     }
 
@@ -288,13 +249,4 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
             }
         }
     }
-
-    // ------------------------------------------------------------------------
-    //  user metric groups
-    // ------------------------------------------------------------------------
-    public UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
-        String uniqueName = MetricGroupUtils.getScopeName(this, principalName);
-        return metricManager.getOrCreateMetric(
-                uniqueName, name -> new UserMetricGroup(registry, 
principalName, this));
-    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
index 7995cbc50..a23f11a3f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
@@ -17,64 +17,24 @@
 
 package org.apache.fluss.server.metrics.group;
 
-import org.apache.fluss.annotation.VisibleForTesting;
-import org.apache.fluss.metrics.CharacterFilter;
-import org.apache.fluss.metrics.Counter;
 import org.apache.fluss.metrics.MeterView;
 import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
-import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 
-import java.util.Map;
-
-import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
-
-/** Metrics for the users in server with {@link TabletServerMetricGroup} as 
parent group. */
-public class UserMetricGroup extends AbstractMetricGroup {
-    private static final String NAME = "user";
-
-    private final String principalName;
-    protected final Counter bytesIn;
-    protected final Counter bytesOut;
+/**
+ * Metrics for the overall user level in server with {@link 
TabletServerMetricGroup} as parent
+ * group.
+ */
+public class UserMetricGroup extends AbstractUserMetricGroup {
 
     public UserMetricGroup(
             MetricRegistry registry,
             String principalName,
+            long inactiveMetricExpirationTimeMs,
             TabletServerMetricGroup tabletServerMetricGroup) {
-        super(registry, makeScope(tabletServerMetricGroup, principalName), 
tabletServerMetricGroup);
-        this.principalName = principalName;
+        super(registry, principalName, inactiveMetricExpirationTimeMs, 
tabletServerMetricGroup);
 
-        bytesIn = new ThreadSafeSimpleCounter();
-        bytesOut = new ThreadSafeSimpleCounter();
-        counter(MetricNames.BYTES_IN_COUNT, bytesIn);
         meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
-        counter(MetricNames.BYTES_OUT_COUNT, bytesOut);
         meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
     }
-
-    @VisibleForTesting
-    public String getPrincipalName() {
-        return principalName;
-    }
-
-    @Override
-    protected String getGroupName(CharacterFilter filter) {
-        return NAME;
-    }
-
-    @Override
-    protected void putVariables(Map<String, String> variables) {
-        variables.put("user", principalName);
-    }
-
-    public void incBytesIn(long n) {
-        this.lastRecordTime = System.currentTimeMillis();
-        bytesIn.inc(n);
-    }
-
-    public void incBytesOut(long n) {
-        this.lastRecordTime = System.currentTimeMillis();
-        bytesOut.inc(n);
-    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
new file mode 100644
index 000000000..a43e63f9e
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import java.util.Map;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Metrics for the users per table in server with {@link 
TabletServerMetricGroup} as parent group.
+ */
+public class UserPerTableMetricGroup extends AbstractUserMetricGroup {
+
+    private final TablePath tablePath;
+
+    public UserPerTableMetricGroup(
+            MetricRegistry registry,
+            String principalName,
+            TablePath tablePath,
+            long inactiveMetricExpirationTimeMs,
+            TabletServerMetricGroup tabletServerMetricGroup) {
+        super(registry, principalName, inactiveMetricExpirationTimeMs, 
tabletServerMetricGroup);
+        this.tablePath = checkNotNull(tablePath);
+
+        // only track counters for per-table user metrics for billing purposes,
+        // the corresponding rates are tracked at the overall user level
+        counter(MetricNames.BYTES_IN, bytesIn);
+        counter(MetricNames.BYTES_OUT, bytesOut);
+    }
+
+    @Override
+    protected void putVariables(Map<String, String> variables) {
+        super.putVariables(variables);
+        variables.put("database", tablePath.getDatabaseName());
+        variables.put("table", tablePath.getTableName());
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 28d627d1f..53f8338af 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -85,6 +85,7 @@ import 
org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.ClusterMetadata;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
+import org.apache.fluss.server.metrics.UserMetrics;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
 import org.apache.fluss.server.metrics.group.TableMetricGroup;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
@@ -185,6 +186,7 @@ public class ReplicaManager {
 
     // for metrics
     private final TabletServerMetricGroup serverMetricGroup;
+    private final UserMetrics userMetrics;
     private final String internalListenerName;
 
     private final Clock clock;
@@ -202,6 +204,7 @@ public class ReplicaManager {
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
             FatalErrorHandler fatalErrorHandler,
             TabletServerMetricGroup serverMetricGroup,
+            UserMetrics userMetrics,
             Clock clock)
             throws IOException {
         this(
@@ -217,6 +220,7 @@ public class ReplicaManager {
                 completedKvSnapshotCommitter,
                 fatalErrorHandler,
                 serverMetricGroup,
+                userMetrics,
                 new RemoteLogManager(conf, zkClient, coordinatorGateway, 
clock),
                 clock);
     }
@@ -235,6 +239,7 @@ public class ReplicaManager {
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
             FatalErrorHandler fatalErrorHandler,
             TabletServerMetricGroup serverMetricGroup,
+            UserMetrics userMetrics,
             RemoteLogManager remoteLogManager,
             Clock clock)
             throws IOException {
@@ -280,6 +285,7 @@ public class ReplicaManager {
                         zkClient, completedKvSnapshotCommitter, 
kvSnapshotResource, conf);
         this.remoteLogManager = remoteLogManager;
         this.serverMetricGroup = serverMetricGroup;
+        this.userMetrics = userMetrics;
         this.clock = clock;
         registerMetrics();
     }
@@ -501,7 +507,6 @@ public class ReplicaManager {
             int requiredAcks,
             Map<TableBucket, KvRecordBatch> entriesPerBucket,
             @Nullable int[] targetColumns,
-            @Nullable UserContext userContext,
             Consumer<List<PutKvResultForBucket>> responseCallback) {
         if (isRequiredAcksInvalid(requiredAcks)) {
             throw new InvalidRequiredAcksException("Invalid required acks: " + 
requiredAcks);
@@ -509,7 +514,7 @@ public class ReplicaManager {
 
         long startTime = System.currentTimeMillis();
         Map<TableBucket, PutKvResultForBucket> kvPutResult =
-                putToLocalKv(entriesPerBucket, targetColumns, requiredAcks, 
userContext);
+                putToLocalKv(entriesPerBucket, targetColumns, requiredAcks);
         LOG.debug(
                 "Put records to local kv storage and wait generate cdc log in 
{} ms",
                 System.currentTimeMillis() - startTime);
@@ -935,18 +940,21 @@ public class ReplicaManager {
     private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
             Map<TableBucket, MemoryLogRecords> entriesPerBucket,
             int requiredAcks,
-            UserContext userContext) {
+            @Nullable UserContext userContext) {
         Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new 
HashMap<>();
         for (Map.Entry<TableBucket, MemoryLogRecords> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
+            MemoryLogRecords records = entry.getValue();
             TableMetricGroup tableMetrics = null;
             try {
                 Replica replica = getReplicaOrException(tb);
                 tableMetrics = replica.tableMetrics();
                 tableMetrics.totalProduceLogRequests().inc();
+                // record user metrics before appending to log,
+                // so that if appending fails, we still account the bytes.
+                userMetrics.incBytesIn(userContext, replica.getTablePath(), 
records.sizeInBytes());
                 LOG.trace("Append records to local log tablet for table bucket 
{}", tb);
-                LogAppendInfo appendInfo =
-                        replica.appendRecordsToLeader(entry.getValue(), 
requiredAcks);
+                LogAppendInfo appendInfo = 
replica.appendRecordsToLeader(records, requiredAcks);
 
                 long baseOffset = appendInfo.firstOffset();
                 LOG.trace(
@@ -958,7 +966,7 @@ public class ReplicaManager {
                 resultForBucketMap.put(
                         tb,
                         new ProduceLogResultForBucket(tb, baseOffset, 
appendInfo.lastOffset() + 1));
-                tableMetrics.incLogBytesIn(appendInfo.validBytes(), 
userContext);
+                tableMetrics.incLogBytesIn(appendInfo.validBytes());
                 tableMetrics.incLogMessageIn(appendInfo.numMessages());
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
@@ -981,8 +989,7 @@ public class ReplicaManager {
     private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
             Map<TableBucket, KvRecordBatch> entriesPerBucket,
             @Nullable int[] targetColumns,
-            int requiredAcks,
-            UserContext userContext) {
+            int requiredAcks) {
         Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new 
HashMap<>();
         for (Map.Entry<TableBucket, KvRecordBatch> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
@@ -1005,9 +1012,8 @@ public class ReplicaManager {
                 // metric for kv
                 tableMetrics.incKvMessageIn(entry.getValue().getRecordCount());
                 tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes());
-                // metric for cdc log of kv.
-                // We set "userContext" to null to avoid cdc log attributed to 
any user.
-                tableMetrics.incLogBytesIn(appendInfo.validBytes(), null);
+                // metric for cdc log of kv
+                tableMetrics.incLogBytesIn(appendInfo.validBytes());
                 tableMetrics.incLogMessageIn(appendInfo.numMessages());
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
@@ -1063,7 +1069,7 @@ public class ReplicaManager {
     public Map<TableBucket, LogReadResult> readFromLog(
             FetchParams fetchParams,
             Map<TableBucket, FetchReqInfo> bucketFetchInfo,
-            UserContext userContext) {
+            @Nullable UserContext userContext) {
         Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
         boolean isFromFollower = fetchParams.isFromFollower();
         int limitBytes = fetchParams.maxFetchBytes();
@@ -1121,7 +1127,8 @@ public class ReplicaManager {
                 if (isFromFollower) {
                     
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
                 } else {
-                    tableMetrics.incLogBytesOut(recordBatchSize, userContext);
+                    tableMetrics.incLogBytesOut(recordBatchSize);
+                    userMetrics.incBytesOut(userContext, 
replica.getTablePath(), recordBatchSize);
                 }
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
index 32394b13f..e17b7e1cc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
@@ -41,6 +41,8 @@ import 
org.apache.fluss.server.replica.ReplicaManager.LogReadResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -60,7 +62,7 @@ public class DelayedFetchLog extends DelayedOperation {
     private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap;
     private final Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback;
     private final TabletServerMetricGroup serverMetricGroup;
-    private final UserContext userContext;
+    @Nullable private final UserContext userContext;
 
     public DelayedFetchLog(
             FetchParams params,
@@ -68,7 +70,7 @@ public class DelayedFetchLog extends DelayedOperation {
             Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap,
             Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback,
             TabletServerMetricGroup serverMetricGroup,
-            UserContext userContext) {
+            @Nullable UserContext userContext) {
         super(params.maxWaitMs());
         this.params = params;
         this.replicaManager = replicaManager;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 6de0762da..a014633a9 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -45,8 +45,8 @@ import 
org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter;
 import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
-import org.apache.fluss.server.metrics.MetricManager;
 import org.apache.fluss.server.metrics.ServerMetricUtils;
+import org.apache.fluss.server.metrics.UserMetrics;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.replica.ReplicaManager;
 import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -124,7 +124,7 @@ public class TabletServer extends ServerBase {
     private MetricRegistry metricRegistry;
 
     @GuardedBy("lock")
-    private MetricManager metricManager;
+    private UserMetrics userMetrics;
 
     @GuardedBy("lock")
     private TabletServerMetricGroup tabletServerMetricGroup;
@@ -190,8 +190,10 @@ public class TabletServer extends ServerBase {
 
             List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, 
ServerType.TABLET_SERVER);
 
+            this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
+            scheduler.startup();
+
             // for metrics
-            this.metricManager = new MetricManager(conf);
             this.metricRegistry = MetricRegistry.create(conf, pluginManager);
             this.tabletServerMetricGroup =
                     ServerMetricUtils.createTabletServerGroup(
@@ -199,8 +201,8 @@ public class TabletServer extends ServerBase {
                             ServerMetricUtils.validateAndGetClusterId(conf),
                             rack,
                             endpoints.get(0).getHost(),
-                            serverId,
-                            metricManager);
+                            serverId);
+            this.userMetrics = new UserMetrics(scheduler, metricRegistry, 
tabletServerMetricGroup);
 
             this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
 
@@ -214,9 +216,6 @@ public class TabletServer extends ServerBase {
 
             this.metadataCache = new 
TabletServerMetadataCache(metadataManager);
 
-            this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
-            scheduler.startup();
-
             this.logManager =
                     LogManager.create(conf, zkClient, scheduler, clock, 
tabletServerMetricGroup);
             logManager.startup();
@@ -255,6 +254,7 @@ public class TabletServer extends ServerBase {
                                     rpcClient, metadataCache, 
interListenerName),
                             this,
                             tabletServerMetricGroup,
+                            userMetrics,
                             clock);
             replicaManager.startup();
 
@@ -399,8 +399,8 @@ public class TabletServer extends ServerBase {
                     clientMetricGroup.close();
                 }
 
-                if (metricManager != null) {
-                    metricManager.close();
+                if (userMetrics != null) {
+                    userMetrics.close();
                 }
 
                 // We must shut down the scheduler early because otherwise, 
the scheduler could
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index b9c88e36d..891ddb826 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -227,7 +227,6 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
                 request.getAcks(),
                 putKvData,
                 getTargetColumns(request),
-                new UserContext(currentSession().getPrincipal()),
                 bucketResponse -> 
response.complete(makePutKvResponse(bucketResponse)));
         return response;
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java
deleted file mode 100644
index 81aebda2c..000000000
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/MetricManagerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.metrics;
-
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metrics.registry.NOPMetricRegistry;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
-import org.apache.fluss.server.metrics.group.UserMetricGroup;
-
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-
-import static org.apache.fluss.metrics.utils.MetricGroupUtils.getScopeName;
-import static 
org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link MetricManager}. */
-public class MetricManagerTest {
-    @Test
-    void testMetricManager() throws Exception {
-        Configuration conf = new Configuration();
-        // set expiration time to 2s in test
-        conf.set(ConfigOptions.METRICS_MANAGER_INACTIVE_EXPIRATION_TIME, 
Duration.ofSeconds(2));
-        MetricManager metricManager = new MetricManager(conf);
-
-        UserMetricGroup userMetricGroup = TestingMetricGroups.USER_METRICS;
-
-        MetricManager.ExpiredMetricCleanupTask metricCleanupTask =
-                metricManager.new ExpiredMetricCleanupTask();
-
-        // fresh lastRecordTime
-        userMetricGroup.incBytesIn(100);
-        String metricName =
-                getScopeName(userMetricGroup.getParent(), 
userMetricGroup.getPrincipalName());
-        metricManager.getOrCreateMetric(metricName, name -> 
TestingMetricGroups.USER_METRICS);
-
-        // metric successful created
-        
assertThat(metricManager.getMetric(metricName)).isEqualTo(TestingMetricGroups.USER_METRICS);
-
-        // sleep 1s and fresh it, metric should not be removed
-        Thread.sleep(1000);
-        userMetricGroup.incBytesIn(100);
-        metricCleanupTask.run();
-        assertThat(metricManager.getMetric(metricName)).isNotNull();
-        assertThat(userMetricGroup.isClosed()).isFalse();
-
-        // sleep 3s and metric should already be removed
-        Thread.sleep(3000);
-        metricCleanupTask.run();
-
-        // metric successful removed
-        assertThat(metricManager.getMetric(metricName)).isNull();
-        assertThat(userMetricGroup.isClosed()).isTrue();
-
-        // the metric is created after removing.
-        UserMetricGroup newCreated =
-                metricManager.getOrCreateMetric(
-                        metricName,
-                        name ->
-                                new UserMetricGroup(
-                                        NOPMetricRegistry.INSTANCE,
-                                        "user_abc",
-                                        TABLET_SERVER_METRICS));
-        newCreated.incBytesIn(100);
-
-        assertThat(metricManager.getMetric(metricName)).isEqualTo(newCreated);
-        assertThat(newCreated.isClosed()).isFalse();
-    }
-}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
new file mode 100644
index 000000000..c302eb331
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static 
org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UserMetrics}. */
+public class UserMetricsTest {
+
+    private FlussScheduler scheduler;
+
+    @BeforeEach
+    void before() {
+        scheduler = new FlussScheduler(2, false);
+        scheduler.startup();
+    }
+
+    @AfterEach
+    void after() throws InterruptedException {
+        scheduler.shutdown();
+    }
+
+    @Test
+    void testMetricExpiration() {
+        // set expiration time to 2s in test, and 10ms check interval
+        UserMetrics userMetrics =
+                new UserMetrics(
+                        Duration.ofSeconds(2).toMillis(),
+                        100L,
+                        scheduler,
+                        NOPMetricRegistry.INSTANCE,
+                        TABLET_SERVER_METRICS);
+
+        String user1 = "user1";
+        String user2 = "user2";
+        UserContext uc1 = new UserContext(new FlussPrincipal(user1, "USER"));
+        UserContext uc2 = new UserContext(new FlussPrincipal(user2, "USER"));
+
+        TablePath t1 = TablePath.of("db1", "table1");
+        TablePath t2 = TablePath.of("db1", "table2");
+
+        userMetrics.incBytesIn(null, t1, 100);
+        userMetrics.incBytesOut(null, t2, 200);
+        userMetrics.incBytesOut(new UserContext(FlussPrincipal.ANY), t1, 200);
+        userMetrics.incBytesIn(new UserContext(FlussPrincipal.ANONYMOUS), t1, 
300);
+        userMetrics.incBytesOut(new 
UserContext(FlussPrincipal.WILD_CARD_PRINCIPAL), t2, 100);
+
+        // ignore null/anonymous/wildcard user
+        assertThat(userMetrics.numMetrics()).isEqualTo(0);
+
+        userMetrics.incBytesIn(uc1, t1, 100);
+        userMetrics.incBytesOut(uc1, t2, 100);
+        userMetrics.incBytesOut(uc2, t2, 200);
+
+        // u1, u1+t1, u1+t2, u2, u2+t2
+        assertThat(userMetrics.numMetrics()).isEqualTo(5);
+
+        scheduler.schedule(
+                "updating-u1t1",
+                () -> {
+                    userMetrics.incBytesIn(uc1, t1, 50);
+                },
+                0L,
+                50L);
+
+        AbstractUserMetricGroup u1t1 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
t1));
+        AbstractUserMetricGroup u1t2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
t2));
+        AbstractUserMetricGroup u2t2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, 
t2));
+        AbstractUserMetricGroup u1 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
null));
+        AbstractUserMetricGroup u2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, 
null));
+
+        // wait enough time for metrics expired
+        retry(
+                Duration.ofSeconds(10),
+                () -> {
+                    // only u1 and u1+t1 are retained.
+                    assertThat(userMetrics.numMetrics()).isEqualTo(2);
+                });
+
+        assertThat(u1t2.isClosed()).isTrue();
+        assertThat(u2t2.isClosed()).isTrue();
+        assertThat(u2.isClosed()).isTrue();
+
+        assertThat(u1.isClosed()).isFalse();
+        assertThat(u1t1.isClosed()).isFalse();
+        assertThat(userMetrics.getOrCreateMetric(new 
UserMetrics.MetricKey(user1, null)))
+                .isSameAs(u1);
+        assertThat(userMetrics.getOrCreateMetric(new 
UserMetrics.MetricKey(user1, t1)))
+                .isSameAs(u1t1);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index 0e5a5fd09..b2e93f2bb 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -19,13 +19,17 @@ package org.apache.fluss.server.metrics.group;
 
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.server.metrics.UserMetrics;
+import org.apache.fluss.testutils.common.ScheduledTask;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import java.util.concurrent.ScheduledFuture;
 
 /** Utilities for various metric groups for testing. */
 public class TestingMetricGroups {
 
     public static final TabletServerMetricGroup TABLET_SERVER_METRICS =
-            new TabletServerMetricGroup(
-                    NOPMetricRegistry.INSTANCE, "fluss", "host", "rack", 0, 
null);
+            new TabletServerMetricGroup(NOPMetricRegistry.INSTANCE, "fluss", 
"host", "rack", 0);
 
     public static final CoordinatorMetricGroup COORDINATOR_METRICS =
             new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", 
"host", "0");
@@ -35,12 +39,34 @@ public class TestingMetricGroups {
                     NOPMetricRegistry.INSTANCE,
                     TablePath.of("mydb", "mytable"),
                     false,
-                    TABLET_SERVER_METRICS,
-                    null);
+                    TABLET_SERVER_METRICS);
 
     public static final BucketMetricGroup BUCKET_METRICS =
             new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0, 
TABLE_METRICS);
 
-    public static final UserMetricGroup USER_METRICS =
-            new UserMetricGroup(NOPMetricRegistry.INSTANCE, "user_abc", 
TABLET_SERVER_METRICS);
+    public static final UserMetrics USER_METRICS =
+            new UserMetrics(
+                    new TestingScheduler(), NOPMetricRegistry.INSTANCE, 
TABLET_SERVER_METRICS);
+
+    // 
------------------------------------------------------------------------------------------
+
+    private static class TestingScheduler implements Scheduler {
+        @Override
+        public void startup() {
+            // no-op
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+            // no-op
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(
+                String name, Runnable task, long delayMs, long periodMs) {
+            // Directly run the task for testing purpose.
+            task.run();
+            return new ScheduledTask<>(() -> null, delayMs, periodMs);
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index fe4eaebf8..7db23130f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -474,7 +474,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
 
@@ -487,7 +486,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                         Collections.singletonMap(
                                                 tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                                         null,
-                                        null,
                                         (result) -> {
                                             // do nothing.
                                         }))
@@ -502,7 +500,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(unknownTb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get())
                 .containsOnly(
@@ -537,7 +534,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
0)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5));
 
@@ -582,7 +578,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
3)),
                 null,
-                null,
                 future::complete);
         PutKvResultForBucket putKvResultForBucket = future.get().get(0);
         assertThat(putKvResultForBucket.getErrorCode())
@@ -623,7 +618,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
1)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
 
@@ -672,7 +666,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                     1,
                     Collections.singletonMap(tb, genKvRecordBatch(deleteList)),
                     null,
-                    null,
                     future::complete);
             assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 
i + 1));
         }
@@ -684,7 +677,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 
18));
 
@@ -745,7 +737,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future1::complete);
         assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 
8));
 
@@ -816,7 +807,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, genKvRecordBatch(keyType, 
rowType, data1)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 4));
         // second prefix lookup in table, prefix key = (1, "a").
@@ -895,7 +885,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future1::complete);
         assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 
8));
 
@@ -1136,7 +1125,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
-                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
     }
@@ -1305,7 +1293,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                                             
Collections.singletonList(
                                                                     
Tuple2.of(key, value)))),
                                             null,
-                                            null,
                                             future::complete);
                                 } catch (Exception e) {
                                     throw new RuntimeException(e);
@@ -1386,7 +1373,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 entriesPerBucket,
                 null,
-                null,
                 writeResultForBuckets -> {
                     // do nothing
                 });
@@ -1425,7 +1411,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 entriesPerBucket,
                 null,
-                null,
                 writeResultForBuckets -> {
                     // do nothing
                 });
@@ -1838,7 +1823,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         CompletableFuture<List<PutKvResultForBucket>> writeFuture = new 
CompletableFuture<>();
         // put kv record batch for every bucket
         replicaManager.putRecordsToKv(
-                300000, -1, kvRecordBatchPerBucket, null, null, 
writeFuture::complete);
+                300000, -1, kvRecordBatchPerBucket, null, 
writeFuture::complete);
         // wait the write ack
         writeFuture.get();
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index ae7e14a87..a41e922bc 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -301,6 +301,7 @@ public class ReplicaTestBase {
                 snapshotReporter,
                 NOPErrorHandler.INSTANCE,
                 TestingMetricGroups.TABLET_SERVER_METRICS,
+                TestingMetricGroups.USER_METRICS,
                 remoteLogManager,
                 manualClock);
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index ff242238b..e512f4ae0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -73,6 +73,7 @@ import static 
org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static 
org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
+import static 
org.apache.fluss.server.metrics.group.TestingMetricGroups.USER_METRICS;
 import static 
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
 import static 
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH;
 import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
@@ -464,6 +465,7 @@ public class ReplicaFetcherThreadTest {
                     new TestingCompletedKvSnapshotCommitter(),
                     NOPErrorHandler.INSTANCE,
                     serverMetricGroup,
+                    USER_METRICS,
                     clock);
         }
 
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index b31f3cd3d..d731d7708 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -530,6 +530,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
     </tr>
     <tr>
       <td rowspan="4">user</td>
+      // 不需要
       <td>bytesInCount</td>
       <td>The total number of bytes written to this server labeled with 
<code>user</code>. </td>
       <td>Counter</td>
@@ -540,6 +541,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>Meter</td>
     </tr>
      <tr>
+         // 不需要
       <td>bytesOutCount</td>
       <td>The total number of bytes read from this server labeled with 
<code>user</code>.</td>
       <td>Counter</td>

Reply via email to