This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7c636c2d18 [HUDI-7731] Fix usage of new Configuration() in production 
code (#11191)
c7c636c2d18 is described below

commit c7c636c2d18673a41aa0e656b6c7746808d4a001
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri May 10 20:47:33 2024 -0400

    [HUDI-7731] Fix usage of new Configuration() in production code (#11191)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../main/java/org/apache/hudi/client/BaseHoodieClient.java  |  2 +-
 .../apache/hudi/client/timeline/HoodieTimelineArchiver.java |  2 +-
 .../apache/hudi/client/transaction/lock/LockManager.java    |  2 +-
 .../client/transaction/lock/metrics/HoodieLockMetrics.java  |  5 +++--
 .../main/java/org/apache/hudi/metrics/HoodieMetrics.java    |  5 +++--
 .../table/action/compact/RunCompactionActionExecutor.java   |  2 +-
 .../hudi/table/action/index/RunIndexActionExecutor.java     |  2 +-
 .../org/apache/hudi/metrics/TestHoodieConsoleMetrics.java   |  5 ++++-
 .../org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java  |  5 ++++-
 .../java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java  |  5 ++++-
 .../java/org/apache/hudi/metrics/TestHoodieMetrics.java     |  5 ++++-
 .../hudi/metrics/datadog/TestDatadogMetricsReporter.java    |  9 ++++++---
 .../test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java | 10 +++++++---
 .../hudi/metrics/prometheus/TestPrometheusReporter.java     |  7 +++++--
 .../hudi/metrics/prometheus/TestPushGateWayReporter.java    | 13 ++++++++-----
 .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java |  2 +-
 .../hudi/metadata/JavaHoodieBackedTableMetadataWriter.java  |  2 +-
 .../apache/hudi/client/TestJavaHoodieBackedMetadata.java    |  2 +-
 .../hudi/client/validator/SparkPreCommitValidator.java      |  2 +-
 .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java |  2 +-
 .../hudi/client/functional/TestHoodieBackedMetadata.java    |  2 +-
 .../java/org/apache/hudi/io/TestHoodieTimelineArchiver.java |  2 +-
 .../apache/hudi/common/table/log/HoodieLogFormatWriter.java |  2 +-
 .../hudi/common/table/log/block/HoodieAvroDataBlock.java    |  3 ++-
 .../hudi/common/table/log/block/HoodieCommandBlock.java     |  3 ++-
 .../hudi/common/table/log/block/HoodieCorruptBlock.java     |  3 ++-
 .../apache/hudi/common/table/log/block/HoodieDataBlock.java |  7 ++++---
 .../hudi/common/table/log/block/HoodieDeleteBlock.java      |  3 ++-
 .../hudi/common/table/log/block/HoodieHFileDataBlock.java   |  4 ++--
 .../apache/hudi/common/table/log/block/HoodieLogBlock.java  |  2 +-
 .../hudi/common/table/log/block/HoodieParquetDataBlock.java |  7 ++-----
 .../java/org/apache/hudi/metadata/BaseTableMetadata.java    |  3 ++-
 .../org/apache/hudi/metadata/HoodieMetadataMetrics.java     |  5 +++--
 .../src/main/java/org/apache/hudi/metrics/Metrics.java      | 12 +++++++-----
 .../apache/hudi/common/functional/TestHoodieLogFormat.java  |  2 +-
 .../hudi/common/table/log/block/TestHoodieDeleteBlock.java  |  3 ++-
 .../procedures/RepairOverwriteHoodiePropsProcedure.scala    |  2 +-
 .../marker/MarkerBasedEarlyConflictDetectionRunnable.java   |  6 ++----
 .../utilities/deltastreamer/HoodieDeltaStreamerMetrics.java |  9 +++++----
 .../hudi/utilities/ingestion/HoodieIngestionMetrics.java    | 10 +++++++---
 .../hudi/utilities/streamer/HoodieStreamerMetrics.java      | 11 ++++++-----
 .../java/org/apache/hudi/utilities/streamer/StreamSync.java |  8 ++++++--
 42 files changed, 120 insertions(+), 78 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index fe964db6862..f982a0e4e22 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -102,7 +102,7 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     this.heartbeatClient = new HoodieHeartbeatClient(storage, this.basePath,
         clientConfig.getHoodieClientHeartbeatIntervalInMs(),
         clientConfig.getHoodieClientHeartbeatTolerableMisses());
-    this.metrics = new HoodieMetrics(config);
+    this.metrics = new HoodieMetrics(config, context.getStorageConf());
     this.txnManager = new TransactionManager(config, storage);
     this.timeGenerator = TimeGenerators.getTimeGenerator(
         config.getTimeGeneratorConfig(), 
HadoopFSUtils.getStorageConf(hadoopConf));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index 175ac5607f4..f4ab6c76e13 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -86,7 +86,7 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     Pair<Integer, Integer> minAndMaxInstants = 
getMinAndMaxInstantsToKeep(table, metaClient);
     this.minInstantsToKeep = minAndMaxInstants.getLeft();
     this.maxInstantsToKeep = minAndMaxInstants.getRight();
-    this.metrics = new HoodieMetrics(config);
+    this.metrics = new HoodieMetrics(config, table.getStorageConf());
   }
 
   public int archiveIfRequired(HoodieEngineContext context) throws IOException 
{
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index c8a07d09684..f2920a7da50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -69,7 +69,7 @@ public class LockManager implements Serializable, 
AutoCloseable {
         
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
     maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
         
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
-    metrics = new HoodieLockMetrics(writeConfig);
+    metrics = new HoodieLockMetrics(writeConfig, storageConf);
     lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, 
maxWaitTimeInMs,
         Arrays.asList(HoodieLockException.class, InterruptedException.class), 
"acquire lock");
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
index bbf3d6876d8..7a793de5392 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -26,6 +26,7 @@ import com.codahale.metrics.Timer;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import java.util.concurrent.TimeUnit;
 
@@ -49,12 +50,12 @@ public class HoodieLockMetrics {
   private static final Object REGISTRY_LOCK = new Object();
   private Metrics metrics;
 
-  public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
+  public HoodieLockMetrics(HoodieWriteConfig writeConfig, 
StorageConfiguration<?> storageConf) {
     this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
     this.writeConfig = writeConfig;
 
     if (isMetricsEnabled) {
-      metrics = Metrics.getInstance(writeConfig.getMetricsConfig());
+      metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), 
storageConf);
       MetricRegistry registry = metrics.getRegistry();
 
       lockAttempts = 
registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 9f1ef7a44c5..d8c60d5f660 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
@@ -106,11 +107,11 @@ public class HoodieMetrics {
   private Counter compactionRequestedCounter = null;
   private Counter compactionCompletedCounter = null;
 
-  public HoodieMetrics(HoodieWriteConfig config) {
+  public HoodieMetrics(HoodieWriteConfig config, StorageConfiguration<?> 
storageConf) {
     this.config = config;
     this.tableName = config.getTableName();
     if (config.isMetricsOn()) {
-      metrics = Metrics.getInstance(config.getMetricsConfig());
+      metrics = Metrics.getInstance(config.getMetricsConfig(), storageConf);
       this.rollbackTimerName = getMetricsName(TIMER_ACTION, 
HoodieTimeline.ROLLBACK_ACTION);
       this.cleanTimerName = getMetricsName(TIMER_ACTION, 
HoodieTimeline.CLEAN_ACTION);
       this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 055cdb5910b..55e8ce7d23f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -73,7 +73,7 @@ public class RunCompactionActionExecutor<T> extends
     this.operationType = operationType;
     checkArgument(operationType == WriteOperationType.COMPACT || operationType 
== WriteOperationType.LOG_COMPACT,
         "Only COMPACT and LOG_COMPACT is supported");
-    metrics = new HoodieMetrics(config);
+    metrics = new HoodieMetrics(config, context.getStorageConf());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index aed06d95867..ab2096b2ede 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -101,7 +101,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
     super(context, config, table, instantTime);
     this.txnManager = new TransactionManager(config, 
table.getMetaClient().getStorage());
     if (config.getMetadataConfig().isMetricsEnabled()) {
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(config.getMetricsConfig()));
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(config.getMetricsConfig(), context.getStorageConf()));
     } else {
       this.metrics = Option.empty();
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
index 43748e96833..4e938ef1cef 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +42,7 @@ public class TestHoodieConsoleMetrics {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -49,7 +52,7 @@ public class TestHoodieConsoleMetrics {
     when(writeConfig.isMetricsOn()).thenReturn(true);
     
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE);
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
index 63a6704b02f..cf488405660 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -43,6 +45,7 @@ public class TestHoodieGraphiteMetrics {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -60,7 +63,7 @@ public class TestHoodieGraphiteMetrics {
     
when(metricsConfig.getGraphiteServerPort()).thenReturn(NetworkTestUtils.nextFreePort());
     when(metricsConfig.getGraphiteReportPeriodSeconds()).thenReturn(30);
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
     metrics.registerGauge("graphite_metric", 123L);
     assertEquals("123", metrics.getRegistry().getGauges()
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index 3b776c104cd..9daebd08661 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -44,6 +46,7 @@ public class TestHoodieJmxMetrics {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -55,7 +58,7 @@ public class TestHoodieJmxMetrics {
     when(metricsConfig.getJmxHost()).thenReturn("localhost");
     
when(metricsConfig.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort()));
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 7b1b918535b..73b9646d577 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.metrics;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.codahale.metrics.Timer;
 import org.junit.jupiter.api.AfterEach;
@@ -49,6 +51,7 @@ public class TestHoodieMetrics {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -58,7 +61,7 @@ public class TestHoodieMetrics {
     when(writeConfig.isMetricsOn()).thenReturn(true);
     
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
index 55637a241e2..9a7b82b4485 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.codahale.metrics.MetricRegistry;
 import org.junit.jupiter.api.AfterEach;
@@ -47,6 +48,8 @@ public class TestDatadogMetricsReporter {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  @Mock
+  StorageConfiguration storageConf;
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -70,7 +73,7 @@ public class TestDatadogMetricsReporter {
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
 
     Throwable t = assertThrows(IllegalStateException.class, () -> {
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
       metrics = hoodieMetrics.getMetrics();
     });
     assertEquals("Datadog cannot be initialized: API key is null or empty.", 
t.getMessage());
@@ -86,7 +89,7 @@ public class TestDatadogMetricsReporter {
     when(metricsConfig.getDatadogMetricPrefix()).thenReturn("");
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
     Throwable t = assertThrows(IllegalStateException.class, () -> {
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
       metrics = hoodieMetrics.getMetrics();
     });
     assertEquals("Datadog cannot be initialized: Metric prefix is null or 
empty.", t.getMessage());
@@ -108,7 +111,7 @@ public class TestDatadogMetricsReporter {
     when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
     assertDoesNotThrow(() -> {
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
       metrics = hoodieMetrics.getMetrics();
     });
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
index 65c4b1d4aba..954619f6174 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
@@ -29,6 +29,8 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -42,6 +44,8 @@ public class TestM3Metrics {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  @Mock
+  StorageConfiguration storageConf;
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -62,7 +66,7 @@ public class TestM3Metrics {
     when(metricsConfig.getM3Service()).thenReturn("hoodie");
     when(metricsConfig.getM3Tags()).thenReturn("tag1=value1,tag2=value2");
     when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
     metrics.registerGauge("metric1", 123L);
     assertEquals("123", 
metrics.getRegistry().getGauges().get("metric1").getValue().toString());
@@ -80,7 +84,7 @@ public class TestM3Metrics {
     when(metricsConfig.getM3Service()).thenReturn("hoodie");
     when(metricsConfig.getM3Tags()).thenReturn("");
     when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
     metrics.registerGauge("metric1", 123L);
     assertEquals("123", 
metrics.getRegistry().getGauges().get("metric1").getValue().toString());
@@ -94,7 +98,7 @@ public class TestM3Metrics {
     when(writeConfig.isMetricsOn()).thenReturn(true);
     when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("");
     assertThrows(RuntimeException.class, () -> {
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     });
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
index 9ad2b8388a2..d95614a577a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java
@@ -18,11 +18,13 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -42,6 +44,7 @@ public class TestPrometheusReporter {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
 
@@ -60,8 +63,8 @@ public class TestPrometheusReporter {
     when(metricsConfig.getPrometheusPort()).thenReturn(9090);
     when(metricsConfig.getBasePath()).thenReturn("s3://test" + 
UUID.randomUUID());
     assertDoesNotThrow(() -> {
-      new HoodieMetrics(writeConfig);
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      new HoodieMetrics(writeConfig, storageConf);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
       metrics = hoodieMetrics.getMetrics();
     });
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
index aa1c3f06b6f..c2c7695932d 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -25,6 +26,7 @@ import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.metrics.MetricUtils;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -34,15 +36,15 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.UUID;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +62,7 @@ public class TestPushGateWayReporter {
   HoodieWriteConfig writeConfig;
   @Mock
   HoodieMetricsConfig metricsConfig;
+  StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf();
 
   HoodieMetrics hoodieMetrics;
   Metrics metrics;
@@ -78,7 +81,7 @@ public class TestPushGateWayReporter {
     configureDefaultReporter();
 
     assertDoesNotThrow(() -> {
-      hoodieMetrics = new HoodieMetrics(writeConfig);
+      hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
       metrics = hoodieMetrics.getMetrics();
     });
 
@@ -103,7 +106,7 @@ public class TestPushGateWayReporter {
       
when(metricsConfig.getMetricReporterFileBasedConfigs()).thenReturn(propPrometheusPath
 + "," + propDatadogPath);
     }
 
-    hoodieMetrics = new HoodieMetrics(writeConfig);
+    hoodieMetrics = new HoodieMetrics(writeConfig, storageConf);
     metrics = hoodieMetrics.getMetrics();
 
     Map<String, Long> metricsMap = new HashMap<>();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 8c994a463d1..5bf73306efb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -89,7 +89,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   protected void initRegistry() {
     if (metadataWriteConfig.isMetricsOn()) {
       // should support executor metrics
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig()));
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf));
     } else {
       this.metrics = Option.empty();
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index 52ccf509b7a..2f6e859e72c 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -79,7 +79,7 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
   @Override
   protected void initRegistry() {
     if (metadataWriteConfig.isMetricsOn()) {
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig()));
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf));
     } else {
       this.metrics = Option.empty();
     }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 3c049dc9c20..2a5b6e33171 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -2393,7 +2393,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
-      Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig());
+      Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), 
storageConf);
       
assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"));
       
assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".totalDuration"));
       assertTrue((Long) 
metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + 
".count").getValue() >= 1L);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
index 5288963e33b..25fae3cb6f5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
@@ -59,7 +59,7 @@ public abstract class SparkPreCommitValidator<T, I, K, O 
extends HoodieData<Writ
     this.table = table;
     this.engineContext = engineContext;
     this.writeConfig = writeConfig;
-    this.metrics = new HoodieMetrics(writeConfig);
+    this.metrics = new HoodieMetrics(writeConfig, 
engineContext.getStorageConf());
   }
   
   protected Set<String> getPartitionsModified(HoodieWriteMetadata<O> 
writeResult) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index b6067d30c34..f5e7165a872 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -119,7 +119,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       } else {
         registry = Registry.getRegistry("HoodieMetadata");
       }
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig()));
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf));
     } else {
       this.metrics = Option.empty();
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index b655cbc2ab5..cfe4fbad4c3 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -3114,7 +3114,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
-      Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig());
+      Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), 
storageConf);
       
assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"));
       
assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".totalDuration"));
       assertTrue((Long) 
metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + 
".count").getValue() >= 1L);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 49351b463c2..4f76764ce98 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -1000,7 +1000,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
                 .build())
             .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table").build();
-    HoodieMetrics metrics = new HoodieMetrics(cfg);
+    HoodieMetrics metrics = new HoodieMetrics(cfg, storageConf);
     BaseHoodieWriteClient client = getHoodieWriteClient(cfg);
     client.archive();
     
assertTrue(metrics.getMetrics().getRegistry().getNames().contains(metrics.getMetricsName(ARCHIVE_ACTION,
 DURATION_STR)));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index db5db422d1d..a3aece81205 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -147,7 +147,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       // bytes for header
       byte[] headerBytes = 
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
       // content bytes
-      byte[] content = block.getContentBytes();
+      byte[] content = block.getContentBytes(storage.getConf());
       // bytes for footer
       byte[] footerBytes = 
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 5ba9e1906b8..8247273aab0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -30,6 +30,7 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
@@ -100,7 +101,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
   }
 
   @Override
-  protected byte[] serializeRecords(List<HoodieRecord> records) throws 
IOException {
+  protected byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException {
     Schema schema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
     GenericDatumWriter<IndexedRecord> writer = new 
GenericDatumWriter<>(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
index deeb903cd18..a519f80eb40 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -61,7 +62,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
   }
 
   @Override
-  public byte[] getContentBytes() {
+  public byte[] getContentBytes(StorageConfiguration<?> storageConf) {
     return new byte[0];
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
index 19d704c2595..74502ee1b8b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import java.io.IOException;
 import java.util.Map;
@@ -38,7 +39,7 @@ public class HoodieCorruptBlock extends HoodieLogBlock {
   }
 
   @Override
-  public byte[] getContentBytes() throws IOException {
+  public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws 
IOException {
     if (!getContent().isPresent() && readBlockLazily) {
       // read content from disk
       inflate();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 1b024a3b530..0819a5f8e53 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -134,7 +135,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
   }
 
   @Override
-  public byte[] getContentBytes() throws IOException {
+  public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws 
IOException {
     // In case this method is called before realizing records from content
     Option<byte[]> content = getContent();
 
@@ -144,7 +145,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
       return content.get();
     }
 
-    return serializeRecords(records.get());
+    return serializeRecords(records.get(), storageConf);
   }
 
   public String getKeyFieldName() {
@@ -285,7 +286,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     );
   }
 
-  protected abstract byte[] serializeRecords(List<HoodieRecord> records) 
throws IOException;
+  protected abstract byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException;
 
   protected abstract <T> ClosableIterator<HoodieRecord<T>> 
deserializeRecords(byte[] content, HoodieRecordType type) throws IOException;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index a55f4f1e623..835fed9d44b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.SerializationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.util.Lazy;
 
 import org.apache.avro.io.BinaryDecoder;
@@ -112,7 +113,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
   }
 
   @Override
-  public byte[] getContentBytes() throws IOException {
+  public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws 
IOException {
     Option<byte[]> content = getContent();
 
     // In case this method is called before realizing keys from content
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 0893637b956..e63a1f9872a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -118,14 +118,14 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
   }
 
   @Override
-  protected byte[] serializeRecords(List<HoodieRecord> records) throws 
IOException {
+  protected byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException {
     HFileContext context = new HFileContextBuilder()
         .withBlockSize(DEFAULT_BLOCK_SIZE)
         .withCompression(compressionAlgorithm.get())
         
.withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME))
         .build();
 
-    Configuration conf = new Configuration();
+    Configuration conf = storageConf.unwrapAs(Configuration.class);
     CacheConfig cacheConfig = new CacheConfig(conf);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 25e90645699..f04b7735997 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -88,7 +88,7 @@ public abstract class HoodieLogBlock {
   }
 
   // Return the bytes representation of the data belonging to a LogBlock
-  public byte[] getContentBytes() throws IOException {
+  public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws 
IOException {
     throw new HoodieException("No implementation was provided");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 6c2e6802769..2997390dc34 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -29,13 +29,11 @@ import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -100,7 +98,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
   }
 
   @Override
-  protected byte[] serializeRecords(List<HoodieRecord> records) throws 
IOException {
+  protected byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException {
     if (records.size() == 0) {
       return new byte[0];
     }
@@ -116,8 +114,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
     config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
     HoodieRecordType recordType = records.iterator().next().getRecordType();
     try (HoodieFileWriter parquetWriter = 
HoodieFileWriterFactory.getFileWriter(
-        HoodieFileFormat.PARQUET, outputStream, 
HoodieStorageUtils.getStorageConf(new Configuration()),
-        config, writerSchema, recordType)) {
+        HoodieFileFormat.PARQUET, outputStream, storageConf, config, 
writerSchema, recordType)) {
       for (HoodieRecord<?> record : records) {
         String recordKey = getRecordKey(record).orElse(null);
         parquetWriter.write(recordKey, record, writerSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 09f06da604b..31c6188184a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -93,7 +93,8 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     this.isMetadataTableInitialized = 
dataMetaClient.getTableConfig().isMetadataTableAvailable();
 
     if (metadataConfig.isMetricsEnabled()) {
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder().fromProperties(metadataConfig.getProps()).build()));
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder()
+          .fromProperties(metadataConfig.getProps()).build(), 
getStorageConf()));
     } else {
       this.metrics = Option.empty();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 970ad0743f4..fce32753883 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -27,6 +27,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metrics.HoodieGauge;
 import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.codahale.metrics.MetricRegistry;
 import org.slf4j.Logger;
@@ -80,8 +81,8 @@ public class HoodieMetadataMetrics implements Serializable {
   private final transient MetricRegistry metricsRegistry;
   private final transient Metrics metrics;
 
-  public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig) {
-    this.metrics = Metrics.getInstance(metricsConfig);
+  public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig, 
StorageConfiguration<?> storageConf) {
+    this.metrics = Metrics.getInstance(metricsConfig, storageConf);
     this.metricsRegistry = metrics.getRegistry();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index af32248eea1..cc50d3a4147 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -25,10 +25,10 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import com.codahale.metrics.MetricRegistry;
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +53,10 @@ public class Metrics {
   private final String basePath;
   private boolean initialized = false;
   private transient Thread shutdownThread = null;
+  private final  StorageConfiguration<?> storageConf;
 
-  public Metrics(HoodieMetricsConfig metricConfig) {
+  public Metrics(HoodieMetricsConfig metricConfig, StorageConfiguration<?> 
storageConf) {
+    this.storageConf = storageConf;
     registry = new MetricRegistry();
     commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
     reporters = new ArrayList<>();
@@ -78,13 +80,13 @@ public class Metrics {
     registerGauges(Registry.getAllMetrics(true, true), 
Option.of(commonMetricPrefix));
   }
 
-  public static synchronized Metrics getInstance(HoodieMetricsConfig 
metricConfig) {
+  public static synchronized Metrics getInstance(HoodieMetricsConfig 
metricConfig, StorageConfiguration<?> storageConf) {
     String basePath = getBasePath(metricConfig);
     if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
       return METRICS_INSTANCE_PER_BASEPATH.get(basePath);
     }
 
-    Metrics metrics = new Metrics(metricConfig);
+    Metrics metrics = new Metrics(metricConfig, storageConf);
     METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics);
     return metrics;
   }
@@ -98,7 +100,7 @@ public class Metrics {
   private List<MetricsReporter> 
addAdditionalMetricsExporters(HoodieMetricsConfig metricConfig) {
     List<MetricsReporter> reporterList = new ArrayList<>();
     List<String> propPathList = 
StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ",");
-    try (HoodieStorage storage = 
HoodieStorageUtils.getStorage(propPathList.get(0), new Configuration())) {
+    try (HoodieStorage storage = 
HoodieStorageUtils.getStorage(propPathList.get(0), storageConf)) {
       for (String propPath : propPathList) {
         HoodieMetricsConfig secondarySourceConfig = 
HoodieMetricsConfig.newBuilder().fromInputStream(
             storage.open(new 
StoragePath(propPath))).withPath(metricConfig.getBasePath()).build();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 367dc506b21..84d329e4b9d 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -455,7 +455,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records, header).getContentBytes();
+    byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records, header).getContentBytes(storage.getConf());
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new 
HoodieLogBlock.HoodieLogBlockContentLocation(
         HoodieTestUtils.getDefaultStorageConfWithDefaults(), null, 0, 
dataBlockContentBytes.length, 0);
     HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, 
Option.ofNullable(dataBlockContentBytes), false,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
similarity index 97%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
index 7441f8cdd41..a6bc014b3f4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -124,7 +125,7 @@ public class TestHoodieDeleteBlock {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
     HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, new HashMap<>());
-    byte[] contentBytes = deleteBlock.getContentBytes();
+    byte[] contentBytes = 
deleteBlock.getContentBytes(HoodieTestUtils.getDefaultStorageConf());
     HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
         Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(), 
new HashMap<>());
     DeleteRecord[] deserializedDeleteRecords = 
deserializeDeleteBlock.getRecordsToDelete();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
index c7e3110b6cd..07b4992dbc8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -51,7 +51,7 @@ class RepairOverwriteHoodiePropsProcedure extends 
BaseProcedure with ProcedureBu
   def outputType: StructType = OUTPUT_TYPE
 
   def loadNewProps(filePath: String, props: Properties):Unit = {
-    val fs = HadoopFSUtils.getFs(filePath, new Configuration())
+    val fs = HadoopFSUtils.getFs(filePath, spark.sessionState.newHadoopConf())
     val fis = fs.open(new Path(filePath))
     props.load(fis)
 
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
index 6509e8d7e0c..11213b56e26 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
@@ -25,12 +25,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.MarkerUtils;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.timeline.service.handlers.MarkerHandler;
 
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,7 +93,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable 
implements Runnable {
       List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, 
storage);
 
       HoodieTableMetaClient metaClient =
-          
HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConf(new 
Configuration())).setBasePath(basePath)
+          
HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(basePath)
               .setLoadActiveTimelineOnLoad(true).build();
       HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
 
@@ -104,7 +102,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable 
implements Runnable {
           storage, basePath);
       Set<String> tableMarkers = candidate.stream().flatMap(instant -> {
         return 
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(instant, storage,
-                new HoodieLocalEngineContext(HadoopFSUtils.getStorageConf(new 
Configuration())), 100)
+                new HoodieLocalEngineContext(storage.getConf().newInstance()), 
100)
             .values().stream().flatMap(Collection::stream);
       }).collect(Collectors.toSet());
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index cd7867edf3e..1dd008da237 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
 
 /**
@@ -30,11 +31,11 @@ import 
org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
 @Deprecated
 public class HoodieDeltaStreamerMetrics extends HoodieStreamerMetrics {
 
-  public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig) {
-    super(writeConfig.getMetricsConfig());
+  public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig, 
StorageConfiguration<?> storageConf) {
+    super(writeConfig.getMetricsConfig(), storageConf);
   }
 
-  public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig) {
-    super(metricsConfig);
+  public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig, 
StorageConfiguration<?> storageConf) {
+    super(metricsConfig, storageConf);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
index 3d07610993d..eb9b51aedb3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.ingestion;
 
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.codahale.metrics.Timer;
 
@@ -30,14 +31,17 @@ import java.io.Serializable;
  */
 public abstract class HoodieIngestionMetrics implements Serializable {
 
+  protected final StorageConfiguration<?> storageConf;
+
   protected final HoodieMetricsConfig writeConfig;
 
-  public HoodieIngestionMetrics(HoodieWriteConfig writeConfig) {
-    this(writeConfig.getMetricsConfig());
+  public HoodieIngestionMetrics(HoodieWriteConfig writeConfig, 
StorageConfiguration<?> storageConf) {
+    this(writeConfig.getMetricsConfig(), storageConf);
   }
 
-  public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig) {
+  public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig, 
StorageConfiguration<?> storageConf) {
     this.writeConfig = writeConfig;
+    this.storageConf = storageConf;
   }
 
   public abstract Timer.Context getOverallTimerContext();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
index fcbf431ed6f..ab1f72185a3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.streamer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 
 import com.codahale.metrics.Timer;
@@ -37,14 +38,14 @@ public class HoodieStreamerMetrics extends 
HoodieIngestionMetrics {
   private transient Timer hiveSyncTimer;
   private transient Timer metaSyncTimer;
 
-  public HoodieStreamerMetrics(HoodieWriteConfig writeConfig) {
-    this(writeConfig.getMetricsConfig());
+  public HoodieStreamerMetrics(HoodieWriteConfig writeConfig, 
StorageConfiguration<?> storageConf) {
+    this(writeConfig.getMetricsConfig(), storageConf);
   }
 
-  public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig) {
-    super(writeConfig);
+  public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig, 
StorageConfiguration<?> storageConf) {
+    super(writeConfig, storageConf);
     if (writeConfig.isMetricsOn()) {
-      metrics = Metrics.getInstance(writeConfig);
+      metrics = Metrics.getInstance(writeConfig, storageConf);
       this.overallTimerName = getMetricsName("timer", "deltastreamer");
       this.hiveSyncTimerName = getMetricsName("timer", 
"deltastreamerHiveSync");
       this.metaSyncTimerName = getMetricsName("timer", 
"deltastreamerMetaSync");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 27231caaa5d..74a05242313 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -63,6 +63,7 @@ import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetaSyncException;
@@ -75,6 +76,7 @@ import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -310,8 +312,10 @@ public class StreamSync implements Serializable, Closeable 
{
     this.conf = conf;
 
     HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig();
-    this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, 
hoodieWriteConfig.getMetricsConfig());
-    this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig);
+    this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass,
+        new Class<?>[] { HoodieMetricsConfig.class, 
StorageConfiguration.class},
+        hoodieWriteConfig.getMetricsConfig(), storage.getConf());
+    this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, 
storage.getConf());
     if (props.getBoolean(ERROR_TABLE_ENABLED.key(), 
ERROR_TABLE_ENABLED.defaultValue())) {
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(
           cfg, sparkSession, props, hoodieSparkContext, storage);

Reply via email to