This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 149d84f89705 feat(metrics): Add table-specific metrics registry 
support for multi-tenant scenarios (#18179)
149d84f89705 is described below

commit 149d84f89705f36aa1bfd44c5e379b23c844996e
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Apr 1 16:17:21 2026 -0700

    feat(metrics): Add table-specific metrics registry support for multi-tenant 
scenarios (#18179)
    
    In multi-tenant scenarios where multiple Hudi tables share the same process 
(e.g., in a long-running Spark application), metrics from different tables need 
to be isolated. Currently, the Registry interface uses a simple string key 
which can lead to metric collisions when multiple tables use the same registry 
names.
    
    This PR adds support for table-specific metric registries by introducing a 
compound key format (tableName::registryName) that allows each table to have 
its own isolated metrics.
    
    Summary:
    This PR ports the table-specific metrics registry functionality to enable 
proper metric isolation in multi-tenant deployments.
    
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |   5 -
 .../apache/hudi/util/DistributedRegistryUtil.java  |  56 +++++
 .../apache/hudi/client/SparkRDDWriteClient.java    |  30 +--
 .../client/common/HoodieSparkEngineContext.java    |  54 ++++-
 .../apache/hudi/metrics/DistributedRegistry.java   |   5 +
 .../hudi/metrics/TestDistributedRegistry.java      | 231 +++++++++++++++++++++
 .../hudi/common/engine/HoodieEngineContext.java    |  14 ++
 .../hudi/hadoop/fs/HoodieWrapperFileSystem.java    |  10 +
 .../apache/hudi/common/metrics/LocalRegistry.java  |   5 +
 .../org/apache/hudi/common/metrics/Registry.java   | 107 ++++++++--
 10 files changed, 465 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index b0875672b5e9..a1155f439ffa 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -123,7 +123,6 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     this.txnManager = transactionManager;
     this.timeGenerator = timeGenerator;
     startEmbeddedServerView();
-    initWrapperFSMetrics();
     runClientInitCallbacks();
   }
 
@@ -189,10 +188,6 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     return context;
   }
 
-  protected void initWrapperFSMetrics() {
-    // no-op.
-  }
-
   protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
     return HoodieTableMetaClient.builder()
         .setConf(storageConf.newInstance())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
new file mode 100644
index 000000000000..7801a12d28e1
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+
+/**
+ * Utility class for creating and initializing distributed metric registries.
+ */
+public class DistributedRegistryUtil {
+
+  /**
+   * Creates and sets metrics registries for HoodieWrapperFileSystem.
+   * When executor metrics are enabled, this creates distributed registries 
that can collect
+   * metrics from Spark executors. Otherwise, it creates local registries.
+   *
+   * @param context The engine context
+   * @param config The write configuration
+   */
+  public static void createWrapperFileSystemRegistries(HoodieEngineContext 
context, HoodieWriteConfig config) {
+    if (config.isMetricsOn()) {
+      Registry registry;
+      Registry registryMeta;
+      if (config.isExecutorMetricsEnabled()) {
+        // Create and set distributed registry for HoodieWrapperFileSystem
+        registry = context.getMetricRegistry(config.getTableName(), 
HoodieWrapperFileSystem.REGISTRY_NAME);
+        registryMeta = context.getMetricRegistry(config.getTableName(), 
HoodieWrapperFileSystem.REGISTRY_META_NAME);
+      } else {
+        registry = Registry.getRegistryOfClass(config.getTableName(), 
HoodieWrapperFileSystem.REGISTRY_NAME,
+            
Registry.getRegistry(HoodieWrapperFileSystem.REGISTRY_NAME).getClass().getName());
+        registryMeta = Registry.getRegistryOfClass(config.getTableName(), 
HoodieWrapperFileSystem.REGISTRY_META_NAME,
+            
Registry.getRegistry(HoodieWrapperFileSystem.REGISTRY_META_NAME).getClass().getName());
+      }
+      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index cf01f39a01c9..f42c7f390e10 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -24,7 +24,6 @@ import 
org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.SparkReleaseResources;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,20 +36,19 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkMetadataWriterFactory;
-import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.util.DistributedRegistryUtil;
 
 import com.codahale.metrics.Timer;
 import lombok.AllArgsConstructor;
@@ -59,7 +57,6 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.Serializable;
 import java.util.List;
@@ -81,6 +78,7 @@ public class SparkRDDWriteClient<T> extends
   public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig,
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, 
SparkUpgradeDowngradeHelper.getInstance());
+    DistributedRegistryUtil.createWrapperFileSystemRegistries(context, 
writeConfig);
     this.tableServiceClient = new SparkRDDTableServiceClient<T>(context, 
writeConfig, getTimelineServer());
     checkSpeculativeExecution();
   }
@@ -389,30 +387,6 @@ public class SparkRDDWriteClient<T> extends
     }
   }
 
-  @Override
-  protected void initWrapperFSMetrics() {
-    if (config.isMetricsOn()) {
-      Registry registry;
-      Registry registryMeta;
-      JavaSparkContext jsc = ((HoodieSparkEngineContext) 
context).getJavaSparkContext();
-
-      if (config.isExecutorMetricsEnabled()) {
-        // Create a distributed registry for HoodieWrapperFileSystem
-        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
-            DistributedRegistry.class.getName());
-        ((DistributedRegistry) registry).register(jsc);
-        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder",
-            DistributedRegistry.class.getName());
-        ((DistributedRegistry) registryMeta).register(jsc);
-      } else {
-        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
-        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder");
-      }
-
-      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
-    }
-  }
-
   @Override
   protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient 
metaClient, List<String> columnsToIndex) {
     new HoodieSparkIndexClient(config, 
getEngineContext()).createOrUpdateColumnStatsIndexDefinition(metaClient, 
columnsToIndex);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 7e0d9db2feaf..d764dcd3cd54 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -47,6 +47,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.metrics.DistributedRegistry;
 
 import lombok.Getter;
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +71,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -86,6 +89,12 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
   private final SQLContext sqlContext;
   private final Map<HoodieDataCacheKey, List<Integer>> cachedRddIds = new 
HashMap<>();
 
+  /**
+   * Map of all distributed registries created via getMetricRegistry().
+   * This map is passed to Spark executors to make the registries available 
there.
+   */
+  private static final Map<String, Registry> DISTRIBUTED_REGISTRY_MAP = new 
ConcurrentHashMap<>();
+
   public HoodieSparkEngineContext(JavaSparkContext jsc) {
     this(jsc, SQLContext.getOrCreate(jsc.sc()));
   }
@@ -128,12 +137,18 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int 
parallelism) {
-    return javaSparkContext.parallelize(data, 
parallelism).map(func::apply).collect();
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+    return javaSparkContext.parallelize(data, parallelism).map(i -> {
+      setRegistries(registries);
+      return func.apply(i);
+    }).collect();
   }
 
   @Override
   public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, 
SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, 
V> reduceFunc, int parallelism) {
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
     return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
+      setRegistries(registries);
       Pair<K, V> pair = mapToPairFunc.call(input);
       return new Tuple2<>(pair.getLeft(), pair.getRight());
     }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
@@ -143,11 +158,13 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
   public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
       Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
       SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
     return javaSparkContext.parallelize(data.collect(Collectors.toList()), 
parallelism)
-        .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator 
->
-            flatMapToPairFunc.call(iterator)
-                .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator()
-        )
+        .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator 
-> {
+          setRegistries(registries);
+          return flatMapToPairFunc.call(iterator)
+              .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
+        })
         .reduceByKey(reduceFunc::apply, parallelism)
         .map(e -> new ImmutablePair<>(e._1, e._2))
         .collect().stream();
@@ -162,7 +179,11 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
Stream<O>> func, int parallelism) {
-    return javaSparkContext.parallelize(data, parallelism).flatMap(x -> 
func.apply(x).iterator()).collect();
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+    return javaSparkContext.parallelize(data, parallelism).flatMap(x -> {
+      setRegistries(registries);
+      return func.apply(x).iterator();
+    }).collect();
   }
 
   @Override
@@ -172,13 +193,16 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <I, K, V> Map<K, V> mapToPair(List<I> data, 
SerializablePairFunction<I, K, V> func, Integer parallelism) {
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
     if (Objects.nonNull(parallelism)) {
       return javaSparkContext.parallelize(data, parallelism).mapToPair(input 
-> {
+        setRegistries(registries);
         Pair<K, V> pair = func.call(input);
         return new Tuple2(pair.getLeft(), pair.getRight());
       }).collectAsMap();
     } else {
       return javaSparkContext.parallelize(data).mapToPair(input -> {
+        setRegistries(registries);
         Pair<K, V> pair = func.call(input);
         return new Tuple2(pair.getLeft(), pair.getRight());
       }).collectAsMap();
@@ -254,6 +278,24 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     return javaSparkContext.sc().applicationId();
   }
 
+  @Override
+  public Registry getMetricRegistry(String tableName, String registryName) {
+    final String prefixedName = tableName.isEmpty() ? registryName : tableName 
+ "." + registryName;
+    return DISTRIBUTED_REGISTRY_MAP.computeIfAbsent(prefixedName, key -> {
+      Registry registry = Registry.getRegistryOfClass(tableName, registryName, 
DistributedRegistry.class.getName());
+      ((DistributedRegistry) registry).register(javaSparkContext);
+      return registry;
+    });
+  }
+
+  /**
+   * Register the distributed registries on Spark executors.
+   * This is called within Spark operations to make the registries available 
on executors.
+   */
+  private static void setRegistries(Map<String, Registry> registries) {
+    Registry.setRegistries(registries.values());
+  }
+
   @Override
   public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
     Function2<O, I, O> seqOpFunc = seqOp::apply;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
index 6cd7e8a274ef..4da400d8869a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -40,6 +40,11 @@ public class DistributedRegistry extends 
AccumulatorV2<Map<String, Long>, Map<St
     this.name = name;
   }
 
+  @Override
+  public String getName() {
+    return name;
+  }
+
   public void register(JavaSparkContext jsc) {
     if (!isRegistered()) {
       jsc.sc().register(this);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
new file mode 100644
index 000000000000..c64e11a27059
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link DistributedRegistry}.
+ */
+public class TestDistributedRegistry {
+  private static final String METRIC_1 = "metric1";
+  private static final String METRIC_2 = "metric2";
+  private static final String REGISTRY_NAME = "testDistributedRegistry";
+
+  private static JavaSparkContext jsc;
+  private static HoodieSparkEngineContext engineContext;
+
+  @BeforeAll
+  public static void setUp() {
+    jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestDistributedRegistry.class.getSimpleName()));
+    engineContext = new HoodieSparkEngineContext(jsc);
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+  }
+
+  @Test
+  public void testRegisterAndName() {
+    String registryName = REGISTRY_NAME + "_testRegisterAndName";
+    Registry registry = engineContext.getMetricRegistry("", registryName);
+    assertThat(registry, instanceOf(DistributedRegistry.class));
+
+    // Then: registry name must match
+    Assertions.assertEquals(registryName, registry.getName());
+
+    // Then: there shouldn't be any metrics
+    Assertions.assertTrue(registry.getAllCounts().isEmpty());
+  }
+
+  @Test
+  public void testAddSetIncrementMetricsSingleThread() {
+    // Given: distributed registry is created but not registered to spark 
context
+    String registryName = REGISTRY_NAME + "_testAddSetIncrement";
+    DistributedRegistry registry = new DistributedRegistry(registryName);
+
+    // When: metrics and values are added
+    registry.add(METRIC_1, 1);
+    registry.add(METRIC_1, 3);
+    registry.add(METRIC_2, 5);
+    registry.add(METRIC_2, 7);
+
+    // Then: there should be 2 metrics counts
+    Map<String, Long> counts = registry.getAllCounts();
+    Assertions.assertEquals(2, counts.size());
+
+    // Then: metric1 should exist, value 4 is expected
+    Assertions.assertTrue(counts.containsKey(METRIC_1));
+    Assertions.assertEquals(4, counts.get(METRIC_1));
+
+    // Then: metric2 should exist, value 12 is expected
+    Assertions.assertTrue(counts.containsKey(METRIC_2));
+    Assertions.assertEquals(12, counts.get(METRIC_2));
+
+    // When: metric1 and 2 are incremented
+    registry.increment(METRIC_1);
+    registry.increment(METRIC_2);
+
+    // Then: expected for metric1 is 5, metric2 is 13
+    counts = registry.getAllCounts();
+    Assertions.assertEquals(5, counts.get(METRIC_1));
+    Assertions.assertEquals(13, counts.get(METRIC_2));
+
+    // When: metric1 is set to 11 and metric2 is set to 13
+    registry.set(METRIC_1, 11);
+    registry.set(METRIC_2, 13);
+
+    // Then: expected value for metric1 and 2 are 11,13 respectively
+    counts = registry.getAllCounts();
+    Assertions.assertEquals(11, counts.get(METRIC_1));
+    Assertions.assertEquals(13, counts.get(METRIC_2));
+  }
+
+  @Test
+  public void testIncrementMetricsParallel() {
+    // Given: distributed registry is created and registered to spark context
+    String registryName = REGISTRY_NAME + "_testIncrementParallel";
+    Registry registry = engineContext.getMetricRegistry("", registryName);
+
+    // Given: list of metrics to be added in parallel in spark executor.
+    List<String> info = new ArrayList<>();
+
+    // Given: metric 1 will have 1000 metrics
+    int numMetric1 = 1000;
+    for (int i = 0; i < numMetric1; i++) {
+      info.add(METRIC_1);
+    }
+
+    // Given: metric 2 will have 1500 metrics
+    int numMetric2 = 1500;
+    for (int i = 0; i < numMetric2; i++) {
+      info.add(METRIC_2);
+    }
+
+    // When: spark executors run on 2500 metrics
+    engineContext.map(info, metricName -> {
+      registry.increment(metricName);
+      return null;
+    }, 100);
+
+    // Then: there should be two metrics
+    Map<String, Long> metricCounts = registry.getAllCounts();
+    Assertions.assertEquals(2, metricCounts.size());
+
+    // Then: metric 1 should exist and have value 1000
+    Assertions.assertTrue(metricCounts.containsKey(METRIC_1));
+    Assertions.assertEquals(numMetric1, metricCounts.get(METRIC_1));
+
+    // Then: metric 2 should exist and have value 1500
+    Assertions.assertTrue(metricCounts.containsKey(METRIC_2));
+    Assertions.assertEquals(numMetric2, metricCounts.get(METRIC_2));
+  }
+
+  @Test
+  public void testAddMetricsParallel() {
+    // Given: distributed registry is created and registered to spark context
+    String registryName = REGISTRY_NAME + "_testAddParallel";
+    Registry registry = engineContext.getMetricRegistry("", registryName);
+
+    // Given: list of metric values to be added in parallel
+    List<Long> values = new ArrayList<>();
+    int numValues = 1000;
+    long expectedSum = 0;
+    for (int i = 0; i < numValues; i++) {
+      values.add((long) i);
+      expectedSum += i;
+    }
+
+    // When: spark executors add all values to metric1
+    final long finalExpectedSum = expectedSum;
+    engineContext.map(values, value -> {
+      registry.add(METRIC_1, value);
+      return null;
+    }, 100);
+
+    // Then: metric1 should have the sum of all values
+    Map<String, Long> metricCounts = registry.getAllCounts();
+    Assertions.assertEquals(1, metricCounts.size());
+    Assertions.assertEquals(finalExpectedSum, metricCounts.get(METRIC_1));
+  }
+
+  @Test
+  public void testClear() {
+    // Given: distributed registry with some metrics
+    String registryName = REGISTRY_NAME + "_testClear";
+    DistributedRegistry registry = new DistributedRegistry(registryName);
+    registry.add(METRIC_1, 10);
+    registry.add(METRIC_2, 20);
+
+    // Verify metrics exist
+    Assertions.assertEquals(2, registry.getAllCounts().size());
+
+    // When: clear is called
+    registry.clear();
+
+    // Then: all metrics should be cleared
+    Assertions.assertTrue(registry.getAllCounts().isEmpty());
+  }
+
+  @Test
+  public void testGetAllCountsWithPrefix() {
+    // Given: distributed registry with some metrics
+    String registryName = REGISTRY_NAME + "_testPrefix";
+    DistributedRegistry registry = new DistributedRegistry(registryName);
+    registry.add(METRIC_1, 10);
+    registry.add(METRIC_2, 20);
+
+    // When: getAllCounts is called with prefix
+    Map<String, Long> countsWithPrefix = registry.getAllCounts(true);
+
+    // Then: metric names should be prefixed with registry name
+    Assertions.assertEquals(2, countsWithPrefix.size());
+    Assertions.assertTrue(countsWithPrefix.containsKey(registryName + "." + 
METRIC_1));
+    Assertions.assertTrue(countsWithPrefix.containsKey(registryName + "." + 
METRIC_2));
+    Assertions.assertEquals(10, countsWithPrefix.get(registryName + "." + 
METRIC_1));
+    Assertions.assertEquals(20, countsWithPrefix.get(registryName + "." + 
METRIC_2));
+
+    // When: getAllCounts is called without prefix
+    Map<String, Long> countsWithoutPrefix = registry.getAllCounts(false);
+
+    // Then: metric names should not be prefixed
+    Assertions.assertEquals(2, countsWithoutPrefix.size());
+    Assertions.assertTrue(countsWithoutPrefix.containsKey(METRIC_1));
+    Assertions.assertTrue(countsWithoutPrefix.containsKey(METRIC_2));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 2fce591b60e9..cf2893e1249c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -36,6 +36,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableSortingIterator;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.metrics.LocalRegistry;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.storage.StorageConfiguration;
 
@@ -131,6 +133,18 @@ public abstract class HoodieEngineContext {
     return "Unknown";
   }
 
+  /**
+   * Return a metric registry for the given table and registry name. This is 
used for tracking metrics.
+   * The default implementation returns a LocalRegistry. Engine-specific 
implementations (like Spark) should override
+   * this to return a DistributedRegistry for tracking metrics across 
executors.
+   *
+   * @param tableName Name of the table for which the registry is needed
+   * @param registryName Name of the registry
+   */
+  public Registry getMetricRegistry(String tableName, String registryName) {
+    return Registry.getRegistryOfClass(tableName, registryName, 
LocalRegistry.class.getName());
+  }
+
   /**
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using given combine functions and a neutral "zero value".
    *
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
index 72e989ee9846..24674ee725a9 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
@@ -81,6 +81,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   private static final String METAFOLDER_NAME = ".hoodie";
 
+  /**
+   * Registry name for file system metrics.
+   */
+  public static final String REGISTRY_NAME = 
HoodieWrapperFileSystem.class.getSimpleName();
+
+  /**
+   * Registry name for metadata folder file system metrics.
+   */
+  public static final String REGISTRY_META_NAME = 
HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder";
+
   /**
    * Names for metrics.
    */
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java 
b/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 330068f6d655..e0929ff2f91c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -33,6 +33,11 @@ public class LocalRegistry implements Registry {
     this.name = name;
   }
 
+  @Override
+  public String getName() {
+    return name;
+  }
+
   @Override
   public void clear() {
     counters.clear();
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java 
b/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
index bb25c143ddeb..7373481c44aa 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -18,45 +18,95 @@
 
 package org.apache.hudi.common.metrics;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Interface which defines a lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi 
metrics.
+ *
+ * Registries can be used to track related metrics under a common name - e.g. 
metrics for Metadata Table.
  */
 public interface Registry extends Serializable {
+  Logger LOG = LoggerFactory.getLogger(Registry.class);
+
+  /**
+   * Separator used for compound registry keys (tableName + registryName).
+   */
+  String KEY_SEPARATOR = "::";
 
+  /**
+   * Map of all registries that have been created.
+   * The key is a compound of table name and registry name in format 
"tableName::registryName".
+   * For non-table specific registries (i.e. common for all tables) the 
tableName would be empty.
+   */
   ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>();
 
   /**
-   * Get (or create) the registry for a provided name.
-   *
-   * This function creates a {@code LocalRegistry}.
+   * Creates a compound key from tableName and registryName.
+   */
+  static String makeKey(String tableName, String registryName) {
+    return tableName + KEY_SEPARATOR + registryName;
+  }
+
+  /**
+   * Extracts the registry name from a compound key.
+   */
+  static String getRegistryNameFromKey(String key) {
+    int separatorIndex = key.lastIndexOf(KEY_SEPARATOR);
+    return separatorIndex >= 0 ? key.substring(separatorIndex + 
KEY_SEPARATOR.length()) : key;
+  }
+
+  /**
+   * Get (or create) the registry with the provided name.
+   * This function creates a {@code LocalRegistry}. Only one instance of a 
registry with a given name will be created.
    *
    * @param registryName Name of the registry
    */
   static Registry getRegistry(String registryName) {
-    return getRegistry(registryName, LocalRegistry.class.getName());
+    return getRegistryOfClass("", registryName, LocalRegistry.class.getName());
   }
 
   /**
    * Get (or create) the registry for a provided name and given class.
+   * This is for backward compatibility with existing code.
    *
    * @param registryName Name of the registry.
    * @param clazz The fully qualified name of the registry class to create.
    */
   static Registry getRegistry(String registryName, String clazz) {
-    synchronized (Registry.class) {
-      if (!REGISTRY_MAP.containsKey(registryName)) {
-        Registry registry = (Registry)ReflectionUtils.loadClass(clazz, 
registryName);
-        REGISTRY_MAP.put(registryName, registry);
-      }
-      return REGISTRY_MAP.get(registryName);
+    return getRegistryOfClass("", registryName, clazz);
+  }
+
+  /**
+   * Get (or create) the registry for a provided table and given class.
+   *
+   * @param tableName Name of the table (empty string for 
singleton/process-wide registries).
+   * @param registryName Name of the registry.
+   * @param clazz The fully qualified name of the registry class to create.
+   */
+  static Registry getRegistryOfClass(String tableName, String registryName, 
String clazz) {
+    String key = makeKey(tableName, registryName);
+    Registry registry = REGISTRY_MAP.computeIfAbsent(key, k -> {
+      String registryFullName = tableName.isEmpty() ? registryName : tableName 
+ "." + registryName;
+      Registry r = (Registry) ReflectionUtils.loadClass(clazz, 
registryFullName);
+      LOG.info("Created a new registry " + r);
+      return r;
+    });
+
+    if (!registry.getClass().getName().equals(clazz)) {
+      LOG.error("Registry with name " + registryName + " already exists with a 
different class " + registry.getClass().getName()
+          + " than the requested class " + clazz);
     }
+    return registry;
   }
 
   /**
@@ -67,10 +117,27 @@ public interface Registry extends Serializable {
    * @return {@link Map} of metrics name and value
    */
   static Map<String, Long> getAllMetrics(boolean flush, boolean 
prefixWithRegistryName) {
+    return getAllMetrics(flush, prefixWithRegistryName, Option.empty());
+  }
+
+  /**
+   * Get all registered metrics.
+   *
+   * If a Registry did not have a prefix in its name, the commonPrefix is 
pre-pended to its name.
+   *
+   * @param flush clear all metrics after this operation.
+   * @param prefixWithRegistryName prefix each metric name with the registry 
name.
+   * @param commonPrefix prefix to use if the registry name does not have a 
prefix itself.
+   * @return {@link Map} of metrics name and value
+   */
+  static Map<String, Long> getAllMetrics(boolean flush, boolean 
prefixWithRegistryName, Option<String> commonPrefix) {
     synchronized (Registry.class) {
       HashMap<String, Long> allMetrics = new HashMap<>();
-      REGISTRY_MAP.forEach((registryName, registry) -> {
-        allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
+      REGISTRY_MAP.forEach((key, registry) -> {
+        final String registryName = getRegistryNameFromKey(key);
+        final String prefix = (prefixWithRegistryName && 
commonPrefix.isPresent() && !registryName.contains("."))
+            ? commonPrefix.get() + "." : "";
+        registry.getAllCounts(prefixWithRegistryName).forEach((metricKey, 
value) -> allMetrics.put(prefix + metricKey, value));
         if (flush) {
           registry.clear();
         }
@@ -79,6 +146,20 @@ public interface Registry extends Serializable {
     }
   }
 
+  /**
+   * Set all registries if they are not already registered.
+   */
+  static void setRegistries(Collection<Registry> registries) {
+    for (Registry registry : registries) {
+      REGISTRY_MAP.putIfAbsent(makeKey("", registry.getName()), registry);
+    }
+  }
+
+  /**
+   * Returns the name of this registry.
+   */
+  String getName();
+
   /**
    * Clear all metrics.
    */


Reply via email to