This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 149d84f89705 feat(metrics): Add table-specific metrics registry
support for multi-tenant scenarios (#18179)
149d84f89705 is described below
commit 149d84f89705f36aa1bfd44c5e379b23c844996e
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Apr 1 16:17:21 2026 -0700
feat(metrics): Add table-specific metrics registry support for multi-tenant
scenarios (#18179)
In multi-tenant scenarios where multiple Hudi tables share the same process
(e.g., in a long-running Spark application), metrics from different tables need
to be isolated. Currently, the Registry interface uses a simple string key
which can lead to metric collisions when multiple tables use the same registry
names.
This PR adds support for table-specific metric registries by introducing a
compound key format (tableName::registryName) that allows each table to have
its own isolated metrics.
Summary:
This PR ports the table-specific metrics registry functionality to enable
proper metric isolation in multi-tenant deployments.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 5 -
.../apache/hudi/util/DistributedRegistryUtil.java | 56 +++++
.../apache/hudi/client/SparkRDDWriteClient.java | 30 +--
.../client/common/HoodieSparkEngineContext.java | 54 ++++-
.../apache/hudi/metrics/DistributedRegistry.java | 5 +
.../hudi/metrics/TestDistributedRegistry.java | 231 +++++++++++++++++++++
.../hudi/common/engine/HoodieEngineContext.java | 14 ++
.../hudi/hadoop/fs/HoodieWrapperFileSystem.java | 10 +
.../apache/hudi/common/metrics/LocalRegistry.java | 5 +
.../org/apache/hudi/common/metrics/Registry.java | 107 ++++++++--
10 files changed, 465 insertions(+), 52 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index b0875672b5e9..a1155f439ffa 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -123,7 +123,6 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
this.txnManager = transactionManager;
this.timeGenerator = timeGenerator;
startEmbeddedServerView();
- initWrapperFSMetrics();
runClientInitCallbacks();
}
@@ -189,10 +188,6 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
return context;
}
- protected void initWrapperFSMetrics() {
- // no-op.
- }
-
protected HoodieTableMetaClient createMetaClient(boolean
loadActiveTimelineOnLoad) {
return HoodieTableMetaClient.builder()
.setConf(storageConf.newInstance())
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
new file mode 100644
index 000000000000..7801a12d28e1
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+
+/**
+ * Utility class for creating and initializing distributed metric registries.
+ */
+public class DistributedRegistryUtil {
+
+ /**
+ * Creates and sets metrics registries for HoodieWrapperFileSystem.
+ * When executor metrics are enabled, this creates distributed registries
that can collect
+ * metrics from Spark executors. Otherwise, it creates local registries.
+ *
+ * @param context The engine context
+ * @param config The write configuration
+ */
+ public static void createWrapperFileSystemRegistries(HoodieEngineContext
context, HoodieWriteConfig config) {
+ if (config.isMetricsOn()) {
+ Registry registry;
+ Registry registryMeta;
+ if (config.isExecutorMetricsEnabled()) {
+ // Create and set distributed registry for HoodieWrapperFileSystem
+ registry = context.getMetricRegistry(config.getTableName(),
HoodieWrapperFileSystem.REGISTRY_NAME);
+ registryMeta = context.getMetricRegistry(config.getTableName(),
HoodieWrapperFileSystem.REGISTRY_META_NAME);
+ } else {
+ registry = Registry.getRegistryOfClass(config.getTableName(),
HoodieWrapperFileSystem.REGISTRY_NAME,
+
Registry.getRegistry(HoodieWrapperFileSystem.REGISTRY_NAME).getClass().getName());
+ registryMeta = Registry.getRegistryOfClass(config.getTableName(),
HoodieWrapperFileSystem.REGISTRY_META_NAME,
+
Registry.getRegistry(HoodieWrapperFileSystem.REGISTRY_META_NAME).getClass().getName());
+ }
+ HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index cf01f39a01c9..f42c7f390e10 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -24,7 +24,6 @@ import
org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkReleaseResources;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -37,20 +36,19 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkMetadataWriterFactory;
-import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.util.DistributedRegistryUtil;
import com.codahale.metrics.Timer;
import lombok.AllArgsConstructor;
@@ -59,7 +57,6 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import java.util.List;
@@ -81,6 +78,7 @@ public class SparkRDDWriteClient<T> extends
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig
writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService,
SparkUpgradeDowngradeHelper.getInstance());
+ DistributedRegistryUtil.createWrapperFileSystemRegistries(context,
writeConfig);
this.tableServiceClient = new SparkRDDTableServiceClient<T>(context,
writeConfig, getTimelineServer());
checkSpeculativeExecution();
}
@@ -389,30 +387,6 @@ public class SparkRDDWriteClient<T> extends
}
}
- @Override
- protected void initWrapperFSMetrics() {
- if (config.isMetricsOn()) {
- Registry registry;
- Registry registryMeta;
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
context).getJavaSparkContext();
-
- if (config.isExecutorMetricsEnabled()) {
- // Create a distributed registry for HoodieWrapperFileSystem
- registry =
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
- DistributedRegistry.class.getName());
- ((DistributedRegistry) registry).register(jsc);
- registryMeta =
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() +
"MetaFolder",
- DistributedRegistry.class.getName());
- ((DistributedRegistry) registryMeta).register(jsc);
- } else {
- registry =
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
- registryMeta =
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() +
"MetaFolder");
- }
-
- HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
- }
- }
-
@Override
protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient
metaClient, List<String> columnsToIndex) {
new HoodieSparkIndexClient(config,
getEngineContext()).createOrUpdateColumnStatsIndexDefinition(metaClient,
columnsToIndex);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 7e0d9db2feaf..d764dcd3cd54 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -47,6 +47,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.metrics.DistributedRegistry;
import lombok.Getter;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +71,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -86,6 +89,12 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
private final SQLContext sqlContext;
private final Map<HoodieDataCacheKey, List<Integer>> cachedRddIds = new
HashMap<>();
+ /**
+ * Map of all distributed registries created via getMetricRegistry().
+ * This map is passed to Spark executors to make the registries available
there.
+ */
+ private static final Map<String, Registry> DISTRIBUTED_REGISTRY_MAP = new
ConcurrentHashMap<>();
+
public HoodieSparkEngineContext(JavaSparkContext jsc) {
this(jsc, SQLContext.getOrCreate(jsc.sc()));
}
@@ -128,12 +137,18 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
- return javaSparkContext.parallelize(data,
parallelism).map(func::apply).collect();
+ final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+ return javaSparkContext.parallelize(data, parallelism).map(i -> {
+ setRegistries(registries);
+ return func.apply(i);
+ }).collect();
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data,
SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V,
V> reduceFunc, int parallelism) {
+ final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
+ setRegistries(registries);
Pair<K, V> pair = mapToPairFunc.call(input);
return new Tuple2<>(pair.getLeft(), pair.getRight());
}).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
@@ -143,11 +158,13 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
return javaSparkContext.parallelize(data.collect(Collectors.toList()),
parallelism)
- .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator
->
- flatMapToPairFunc.call(iterator)
- .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator()
- )
+ .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator
-> {
+ setRegistries(registries);
+ return flatMapToPairFunc.call(iterator)
+ .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
+ })
.reduceByKey(reduceFunc::apply, parallelism)
.map(e -> new ImmutablePair<>(e._1, e._2))
.collect().stream();
@@ -162,7 +179,11 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
- return javaSparkContext.parallelize(data, parallelism).flatMap(x ->
func.apply(x).iterator()).collect();
+ final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+ return javaSparkContext.parallelize(data, parallelism).flatMap(x -> {
+ setRegistries(registries);
+ return func.apply(x).iterator();
+ }).collect();
}
@Override
@@ -172,13 +193,16 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
+ final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
if (Objects.nonNull(parallelism)) {
return javaSparkContext.parallelize(data, parallelism).mapToPair(input
-> {
+ setRegistries(registries);
Pair<K, V> pair = func.call(input);
return new Tuple2(pair.getLeft(), pair.getRight());
}).collectAsMap();
} else {
return javaSparkContext.parallelize(data).mapToPair(input -> {
+ setRegistries(registries);
Pair<K, V> pair = func.call(input);
return new Tuple2(pair.getLeft(), pair.getRight());
}).collectAsMap();
@@ -254,6 +278,24 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
return javaSparkContext.sc().applicationId();
}
+ @Override
+ public Registry getMetricRegistry(String tableName, String registryName) {
+ final String prefixedName = tableName.isEmpty() ? registryName : tableName
+ "." + registryName;
+ return DISTRIBUTED_REGISTRY_MAP.computeIfAbsent(prefixedName, key -> {
+ Registry registry = Registry.getRegistryOfClass(tableName, registryName,
DistributedRegistry.class.getName());
+ ((DistributedRegistry) registry).register(javaSparkContext);
+ return registry;
+ });
+ }
+
+ /**
+ * Register the distributed registries on Spark executors.
+ * This is called within Spark operations to make the registries available
on executors.
+ */
+ private static void setRegistries(Map<String, Registry> registries) {
+ Registry.setRegistries(registries.values());
+ }
+
@Override
public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
Function2<O, I, O> seqOpFunc = seqOp::apply;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
index 6cd7e8a274ef..4da400d8869a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -40,6 +40,11 @@ public class DistributedRegistry extends
AccumulatorV2<Map<String, Long>, Map<St
this.name = name;
}
+ @Override
+ public String getName() {
+ return name;
+ }
+
public void register(JavaSparkContext jsc) {
if (!isRegistered()) {
jsc.sc().register(this);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
new file mode 100644
index 000000000000..c64e11a27059
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link DistributedRegistry}.
+ */
+public class TestDistributedRegistry {
+ private static final String METRIC_1 = "metric1";
+ private static final String METRIC_2 = "metric2";
+ private static final String REGISTRY_NAME = "testDistributedRegistry";
+
+ private static JavaSparkContext jsc;
+ private static HoodieSparkEngineContext engineContext;
+
+ @BeforeAll
+ public static void setUp() {
+ jsc = new
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestDistributedRegistry.class.getSimpleName()));
+ engineContext = new HoodieSparkEngineContext(jsc);
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ if (jsc != null) {
+ jsc.stop();
+ jsc = null;
+ }
+ }
+
+ @Test
+ public void testRegisterAndName() {
+ String registryName = REGISTRY_NAME + "_testRegisterAndName";
+ Registry registry = engineContext.getMetricRegistry("", registryName);
+ assertThat(registry, instanceOf(DistributedRegistry.class));
+
+ // Then: registry name must match
+ Assertions.assertEquals(registryName, registry.getName());
+
+ // Then: there shouldn't be any metrics
+ Assertions.assertTrue(registry.getAllCounts().isEmpty());
+ }
+
+ @Test
+ public void testAddSetIncrementMetricsSingleThread() {
+ // Given: distributed registry is created but not registered to spark
context
+ String registryName = REGISTRY_NAME + "_testAddSetIncrement";
+ DistributedRegistry registry = new DistributedRegistry(registryName);
+
+ // When: metrics and values are added
+ registry.add(METRIC_1, 1);
+ registry.add(METRIC_1, 3);
+ registry.add(METRIC_2, 5);
+ registry.add(METRIC_2, 7);
+
+ // Then: there should be 2 metrics counts
+ Map<String, Long> counts = registry.getAllCounts();
+ Assertions.assertEquals(2, counts.size());
+
+ // Then: metric1 should exist, value 4 is expected
+ Assertions.assertTrue(counts.containsKey(METRIC_1));
+ Assertions.assertEquals(4, counts.get(METRIC_1));
+
+ // Then: metric2 should exist, value 12 is expected
+ Assertions.assertTrue(counts.containsKey(METRIC_2));
+ Assertions.assertEquals(12, counts.get(METRIC_2));
+
+ // When: metric1 and 2 are incremented
+ registry.increment(METRIC_1);
+ registry.increment(METRIC_2);
+
+ // Then: expected for metric1 is 5, metric2 is 13
+ counts = registry.getAllCounts();
+ Assertions.assertEquals(5, counts.get(METRIC_1));
+ Assertions.assertEquals(13, counts.get(METRIC_2));
+
+ // When: metric1 is set to 11 and metric2 is set to 13
+ registry.set(METRIC_1, 11);
+ registry.set(METRIC_2, 13);
+
+ // Then: expected value for metric1 and 2 are 11,13 respectively
+ counts = registry.getAllCounts();
+ Assertions.assertEquals(11, counts.get(METRIC_1));
+ Assertions.assertEquals(13, counts.get(METRIC_2));
+ }
+
+ @Test
+ public void testIncrementMetricsParallel() {
+ // Given: distributed registry is created and registered to spark context
+ String registryName = REGISTRY_NAME + "_testIncrementParallel";
+ Registry registry = engineContext.getMetricRegistry("", registryName);
+
+ // Given: list of metrics to be added in parallel in spark executor.
+ List<String> info = new ArrayList<>();
+
+ // Given: metric 1 will have 1000 metrics
+ int numMetric1 = 1000;
+ for (int i = 0; i < numMetric1; i++) {
+ info.add(METRIC_1);
+ }
+
+ // Given: metric 2 will have 1500 metrics
+ int numMetric2 = 1500;
+ for (int i = 0; i < numMetric2; i++) {
+ info.add(METRIC_2);
+ }
+
+ // When: spark executors run on 2500 metrics
+ engineContext.map(info, metricName -> {
+ registry.increment(metricName);
+ return null;
+ }, 100);
+
+ // Then: there should be two metrics
+ Map<String, Long> metricCounts = registry.getAllCounts();
+ Assertions.assertEquals(2, metricCounts.size());
+
+ // Then: metric 1 should exist and have value 1000
+ Assertions.assertTrue(metricCounts.containsKey(METRIC_1));
+ Assertions.assertEquals(numMetric1, metricCounts.get(METRIC_1));
+
+ // Then: metric 2 should exist and have value 1500
+ Assertions.assertTrue(metricCounts.containsKey(METRIC_2));
+ Assertions.assertEquals(numMetric2, metricCounts.get(METRIC_2));
+ }
+
+ @Test
+ public void testAddMetricsParallel() {
+ // Given: distributed registry is created and registered to spark context
+ String registryName = REGISTRY_NAME + "_testAddParallel";
+ Registry registry = engineContext.getMetricRegistry("", registryName);
+
+ // Given: list of metric values to be added in parallel
+ List<Long> values = new ArrayList<>();
+ int numValues = 1000;
+ long expectedSum = 0;
+ for (int i = 0; i < numValues; i++) {
+ values.add((long) i);
+ expectedSum += i;
+ }
+
+ // When: spark executors add all values to metric1
+ final long finalExpectedSum = expectedSum;
+ engineContext.map(values, value -> {
+ registry.add(METRIC_1, value);
+ return null;
+ }, 100);
+
+ // Then: metric1 should have the sum of all values
+ Map<String, Long> metricCounts = registry.getAllCounts();
+ Assertions.assertEquals(1, metricCounts.size());
+ Assertions.assertEquals(finalExpectedSum, metricCounts.get(METRIC_1));
+ }
+
+ @Test
+ public void testClear() {
+ // Given: distributed registry with some metrics
+ String registryName = REGISTRY_NAME + "_testClear";
+ DistributedRegistry registry = new DistributedRegistry(registryName);
+ registry.add(METRIC_1, 10);
+ registry.add(METRIC_2, 20);
+
+ // Verify metrics exist
+ Assertions.assertEquals(2, registry.getAllCounts().size());
+
+ // When: clear is called
+ registry.clear();
+
+ // Then: all metrics should be cleared
+ Assertions.assertTrue(registry.getAllCounts().isEmpty());
+ }
+
+ @Test
+ public void testGetAllCountsWithPrefix() {
+ // Given: distributed registry with some metrics
+ String registryName = REGISTRY_NAME + "_testPrefix";
+ DistributedRegistry registry = new DistributedRegistry(registryName);
+ registry.add(METRIC_1, 10);
+ registry.add(METRIC_2, 20);
+
+ // When: getAllCounts is called with prefix
+ Map<String, Long> countsWithPrefix = registry.getAllCounts(true);
+
+ // Then: metric names should be prefixed with registry name
+ Assertions.assertEquals(2, countsWithPrefix.size());
+ Assertions.assertTrue(countsWithPrefix.containsKey(registryName + "." +
METRIC_1));
+ Assertions.assertTrue(countsWithPrefix.containsKey(registryName + "." +
METRIC_2));
+ Assertions.assertEquals(10, countsWithPrefix.get(registryName + "." +
METRIC_1));
+ Assertions.assertEquals(20, countsWithPrefix.get(registryName + "." +
METRIC_2));
+
+ // When: getAllCounts is called without prefix
+ Map<String, Long> countsWithoutPrefix = registry.getAllCounts(false);
+
+ // Then: metric names should not be prefixed
+ Assertions.assertEquals(2, countsWithoutPrefix.size());
+ Assertions.assertTrue(countsWithoutPrefix.containsKey(METRIC_1));
+ Assertions.assertTrue(countsWithoutPrefix.containsKey(METRIC_2));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 2fce591b60e9..cf2893e1249c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -36,6 +36,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableSortingIterator;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.metrics.LocalRegistry;
+import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.storage.StorageConfiguration;
@@ -131,6 +133,18 @@ public abstract class HoodieEngineContext {
return "Unknown";
}
+ /**
+ * Return a metric registry for the given table and registry name. This is
used for tracking metrics.
+ * The default implementation returns a LocalRegistry. Engine-specific
implementations (like Spark) should override
+ * this to return a DistributedRegistry for tracking metrics across
executors.
+ *
+ * @param tableName Name of the table for which the registry is needed
+ * @param registryName Name of the registry
+ */
+ public Registry getMetricRegistry(String tableName, String registryName) {
+ return Registry.getRegistryOfClass(tableName, registryName,
LocalRegistry.class.getName());
+ }
+
/**
* Aggregate the elements of each partition, and then the results for all
the partitions, using given combine functions and a neutral "zero value".
*
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
index 72e989ee9846..24674ee725a9 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
@@ -81,6 +81,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
private static final String METAFOLDER_NAME = ".hoodie";
+ /**
+ * Registry name for file system metrics.
+ */
+ public static final String REGISTRY_NAME =
HoodieWrapperFileSystem.class.getSimpleName();
+
+ /**
+ * Registry name for metadata folder file system metrics.
+ */
+ public static final String REGISTRY_META_NAME =
HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder";
+
/**
* Names for metrics.
*/
diff --git
a/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
b/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 330068f6d655..e0929ff2f91c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -33,6 +33,11 @@ public class LocalRegistry implements Registry {
this.name = name;
}
+ @Override
+ public String getName() {
+ return name;
+ }
+
@Override
public void clear() {
counters.clear();
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
b/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
index bb25c143ddeb..7373481c44aa 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -18,45 +18,95 @@
package org.apache.hudi.common.metrics;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Interface which defines a lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi
metrics.
+ *
+ * Registries can be used to track related metrics under a common name - e.g.
metrics for Metadata Table.
*/
public interface Registry extends Serializable {
+ Logger LOG = LoggerFactory.getLogger(Registry.class);
+
+ /**
+ * Separator used for compound registry keys (tableName + registryName).
+ */
+ String KEY_SEPARATOR = "::";
+ /**
+ * Map of all registries that have been created.
+ * The key is a compound of table name and registry name in format
"tableName::registryName".
+ * For non-table specific registries (i.e. common for all tables) the
tableName would be empty.
+ */
ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>();
/**
- * Get (or create) the registry for a provided name.
- *
- * This function creates a {@code LocalRegistry}.
+ * Creates a compound key from tableName and registryName.
+ */
+ static String makeKey(String tableName, String registryName) {
+ return tableName + KEY_SEPARATOR + registryName;
+ }
+
+ /**
+ * Extracts the registry name from a compound key.
+ */
+ static String getRegistryNameFromKey(String key) {
+ int separatorIndex = key.lastIndexOf(KEY_SEPARATOR);
+ return separatorIndex >= 0 ? key.substring(separatorIndex +
KEY_SEPARATOR.length()) : key;
+ }
+
+ /**
+ * Get (or create) the registry with the provided name.
+ * This function creates a {@code LocalRegistry}. Only one instance of a
registry with a given name will be created.
*
* @param registryName Name of the registry
*/
static Registry getRegistry(String registryName) {
- return getRegistry(registryName, LocalRegistry.class.getName());
+ return getRegistryOfClass("", registryName, LocalRegistry.class.getName());
}
/**
* Get (or create) the registry for a provided name and given class.
+ * This is for backward compatibility with existing code.
*
* @param registryName Name of the registry.
* @param clazz The fully qualified name of the registry class to create.
*/
static Registry getRegistry(String registryName, String clazz) {
- synchronized (Registry.class) {
- if (!REGISTRY_MAP.containsKey(registryName)) {
- Registry registry = (Registry)ReflectionUtils.loadClass(clazz,
registryName);
- REGISTRY_MAP.put(registryName, registry);
- }
- return REGISTRY_MAP.get(registryName);
+ return getRegistryOfClass("", registryName, clazz);
+ }
+
+ /**
+ * Get (or create) the registry for a provided table and given class.
+ *
+ * @param tableName Name of the table (empty string for
singleton/process-wide registries).
+ * @param registryName Name of the registry.
+ * @param clazz The fully qualified name of the registry class to create.
+ */
+ static Registry getRegistryOfClass(String tableName, String registryName,
String clazz) {
+ String key = makeKey(tableName, registryName);
+ Registry registry = REGISTRY_MAP.computeIfAbsent(key, k -> {
+ String registryFullName = tableName.isEmpty() ? registryName : tableName
+ "." + registryName;
+ Registry r = (Registry) ReflectionUtils.loadClass(clazz,
registryFullName);
+ LOG.info("Created a new registry " + r);
+ return r;
+ });
+
+ if (!registry.getClass().getName().equals(clazz)) {
+ LOG.error("Registry with name " + registryName + " already exists with a
different class " + registry.getClass().getName()
+ + " than the requested class " + clazz);
}
+ return registry;
}
/**
@@ -67,10 +117,27 @@ public interface Registry extends Serializable {
* @return {@link Map} of metrics name and value
*/
static Map<String, Long> getAllMetrics(boolean flush, boolean
prefixWithRegistryName) {
+ return getAllMetrics(flush, prefixWithRegistryName, Option.empty());
+ }
+
+ /**
+ * Get all registered metrics.
+ *
+ * If a Registry did not have a prefix in its name, the commonPrefix is
pre-pended to its name.
+ *
+ * @param flush clear all metrics after this operation.
+ * @param prefixWithRegistryName prefix each metric name with the registry
name.
+ * @param commonPrefix prefix to use if the registry name does not have a
prefix itself.
+ * @return {@link Map} of metrics name and value
+ */
+ static Map<String, Long> getAllMetrics(boolean flush, boolean
prefixWithRegistryName, Option<String> commonPrefix) {
synchronized (Registry.class) {
HashMap<String, Long> allMetrics = new HashMap<>();
- REGISTRY_MAP.forEach((registryName, registry) -> {
- allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
+ REGISTRY_MAP.forEach((key, registry) -> {
+ final String registryName = getRegistryNameFromKey(key);
+ final String prefix = (prefixWithRegistryName &&
commonPrefix.isPresent() && !registryName.contains("."))
+ ? commonPrefix.get() + "." : "";
+ registry.getAllCounts(prefixWithRegistryName).forEach((metricKey,
value) -> allMetrics.put(prefix + metricKey, value));
if (flush) {
registry.clear();
}
@@ -79,6 +146,20 @@ public interface Registry extends Serializable {
}
}
+ /**
+ * Set all registries if they are not already registered.
+ */
+ static void setRegistries(Collection<Registry> registries) {
+ for (Registry registry : registries) {
+ REGISTRY_MAP.putIfAbsent(makeKey("", registry.getName()), registry);
+ }
+ }
+
+ /**
+ * Returns the name of this registry.
+ */
+ String getName();
+
/**
* Clear all metrics.
*/