This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new c7c636c2d18 [HUDI-7731] Fix usage of new Configuration() in production code (#11191) c7c636c2d18 is described below commit c7c636c2d18673a41aa0e656b6c7746808d4a001 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri May 10 20:47:33 2024 -0400 [HUDI-7731] Fix usage of new Configuration() in production code (#11191) Co-authored-by: Jonathan Vexler <=> --- .../main/java/org/apache/hudi/client/BaseHoodieClient.java | 2 +- .../apache/hudi/client/timeline/HoodieTimelineArchiver.java | 2 +- .../apache/hudi/client/transaction/lock/LockManager.java | 2 +- .../client/transaction/lock/metrics/HoodieLockMetrics.java | 5 +++-- .../main/java/org/apache/hudi/metrics/HoodieMetrics.java | 5 +++-- .../table/action/compact/RunCompactionActionExecutor.java | 2 +- .../hudi/table/action/index/RunIndexActionExecutor.java | 2 +- .../org/apache/hudi/metrics/TestHoodieConsoleMetrics.java | 5 ++++- .../org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java | 5 ++++- .../java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java | 5 ++++- .../java/org/apache/hudi/metrics/TestHoodieMetrics.java | 5 ++++- .../hudi/metrics/datadog/TestDatadogMetricsReporter.java | 9 ++++++--- .../test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java | 10 +++++++--- .../hudi/metrics/prometheus/TestPrometheusReporter.java | 7 +++++-- .../hudi/metrics/prometheus/TestPushGateWayReporter.java | 13 ++++++++----- .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/metadata/JavaHoodieBackedTableMetadataWriter.java | 2 +- .../apache/hudi/client/TestJavaHoodieBackedMetadata.java | 2 +- .../hudi/client/validator/SparkPreCommitValidator.java | 2 +- .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/client/functional/TestHoodieBackedMetadata.java | 2 +- .../java/org/apache/hudi/io/TestHoodieTimelineArchiver.java | 2 +- .../apache/hudi/common/table/log/HoodieLogFormatWriter.java | 2 +- .../hudi/common/table/log/block/HoodieAvroDataBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieCommandBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieCorruptBlock.java | 3 ++- .../apache/hudi/common/table/log/block/HoodieDataBlock.java | 7 ++++--- .../hudi/common/table/log/block/HoodieDeleteBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieHFileDataBlock.java | 4 ++-- .../apache/hudi/common/table/log/block/HoodieLogBlock.java | 2 +- .../hudi/common/table/log/block/HoodieParquetDataBlock.java | 7 ++----- .../java/org/apache/hudi/metadata/BaseTableMetadata.java | 3 ++- .../org/apache/hudi/metadata/HoodieMetadataMetrics.java | 5 +++-- .../src/main/java/org/apache/hudi/metrics/Metrics.java | 12 +++++++----- .../apache/hudi/common/functional/TestHoodieLogFormat.java | 2 +- .../hudi/common/table/log/block/TestHoodieDeleteBlock.java | 3 ++- .../procedures/RepairOverwriteHoodiePropsProcedure.scala | 2 +- .../marker/MarkerBasedEarlyConflictDetectionRunnable.java | 6 ++---- .../utilities/deltastreamer/HoodieDeltaStreamerMetrics.java | 9 +++++---- .../hudi/utilities/ingestion/HoodieIngestionMetrics.java | 10 +++++++--- .../hudi/utilities/streamer/HoodieStreamerMetrics.java | 11 ++++++----- .../java/org/apache/hudi/utilities/streamer/StreamSync.java | 8 ++++++-- 42 files changed, 120 insertions(+), 78 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index fe964db6862..f982a0e4e22 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -102,7 +102,7 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { this.heartbeatClient = new HoodieHeartbeatClient(storage, this.basePath, clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses()); - this.metrics = new HoodieMetrics(config); + this.metrics = new HoodieMetrics(config, context.getStorageConf()); this.txnManager = new TransactionManager(config, storage); this.timeGenerator = TimeGenerators.getTimeGenerator( config.getTimeGeneratorConfig(), HadoopFSUtils.getStorageConf(hadoopConf)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java index 175ac5607f4..f4ab6c76e13 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java @@ -86,7 +86,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { Pair<Integer, Integer> minAndMaxInstants = getMinAndMaxInstantsToKeep(table, metaClient); this.minInstantsToKeep = minAndMaxInstants.getLeft(); this.maxInstantsToKeep = minAndMaxInstants.getRight(); - this.metrics = new HoodieMetrics(config); + this.metrics = new HoodieMetrics(config, table.getStorageConf()); } public int archiveIfRequired(HoodieEngineContext context) throws IOException { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index c8a07d09684..f2920a7da50 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -69,7 +69,7 @@ public class LockManager implements Serializable, AutoCloseable { Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); - metrics = new HoodieLockMetrics(writeConfig); + metrics = new HoodieLockMetrics(writeConfig, storageConf); lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, maxWaitTimeInMs, Arrays.asList(HoodieLockException.class, InterruptedException.class), "acquire lock"); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java index bbf3d6876d8..7a793de5392 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java @@ -26,6 +26,7 @@ import com.codahale.metrics.Timer; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import java.util.concurrent.TimeUnit; @@ -49,12 +50,12 @@ public class HoodieLockMetrics { private static final Object REGISTRY_LOCK = new Object(); private Metrics metrics; - public HoodieLockMetrics(HoodieWriteConfig writeConfig) { + public HoodieLockMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled(); this.writeConfig = writeConfig; if (isMetricsEnabled) { - metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); MetricRegistry registry = metrics.getRegistry(); lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 9f1ef7a44c5..d8c60d5f660 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; @@ -106,11 +107,11 @@ public class HoodieMetrics { private Counter compactionRequestedCounter = null; private Counter compactionCompletedCounter = null; - public HoodieMetrics(HoodieWriteConfig config) { + public HoodieMetrics(HoodieWriteConfig config, StorageConfiguration<?> storageConf) { this.config = config; this.tableName = config.getTableName(); if (config.isMetricsOn()) { - metrics = Metrics.getInstance(config.getMetricsConfig()); + metrics = Metrics.getInstance(config.getMetricsConfig(), storageConf); this.rollbackTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.ROLLBACK_ACTION); this.cleanTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.CLEAN_ACTION); this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 055cdb5910b..55e8ce7d23f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -73,7 +73,7 @@ public class RunCompactionActionExecutor<T> extends this.operationType = operationType; checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported"); - metrics = new HoodieMetrics(config); + metrics = new HoodieMetrics(config, context.getStorageConf()); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index aed06d95867..ab2096b2ede 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -101,7 +101,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, super(context, config, table, instantTime); this.txnManager = new TransactionManager(config, table.getMetaClient().getStorage()); if (config.getMetadataConfig().isMetricsEnabled()) { - this.metrics = Option.of(new HoodieMetadataMetrics(config.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(config.getMetricsConfig(), context.getStorageConf())); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java index 43748e96833..4e938ef1cef 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java @@ -18,8 +18,10 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +42,7 @@ public class TestHoodieConsoleMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -49,7 +52,7 @@ public class TestHoodieConsoleMetrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java index 63a6704b02f..cf488405660 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java @@ -18,9 +18,11 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -43,6 +45,7 @@ public class TestHoodieGraphiteMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -60,7 +63,7 @@ public class TestHoodieGraphiteMetrics { when(metricsConfig.getGraphiteServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); when(metricsConfig.getGraphiteReportPeriodSeconds()).thenReturn(30); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("graphite_metric", 123L); assertEquals("123", metrics.getRegistry().getGauges() diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java index 3b776c104cd..9daebd08661 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java @@ -18,9 +18,11 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,6 +46,7 @@ public class TestHoodieJmxMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -55,7 +58,7 @@ public class TestHoodieJmxMetrics { when(metricsConfig.getJmxHost()).thenReturn("localhost"); when(metricsConfig.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort())); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index 7b1b918535b..73b9646d577 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -19,11 +19,13 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Timer; import org.junit.jupiter.api.AfterEach; @@ -49,6 +51,7 @@ public class TestHoodieMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -58,7 +61,7 @@ public class TestHoodieMetrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java index 55637a241e2..9a7b82b4485 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java @@ -24,6 +24,7 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.MetricRegistry; import org.junit.jupiter.api.AfterEach; @@ -47,6 +48,8 @@ public class TestDatadogMetricsReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + @Mock + StorageConfiguration storageConf; HoodieMetrics hoodieMetrics; Metrics metrics; @@ -70,7 +73,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); Throwable t = assertThrows(IllegalStateException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage()); @@ -86,7 +89,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getDatadogMetricPrefix()).thenReturn(""); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); Throwable t = assertThrows(IllegalStateException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage()); @@ -108,7 +111,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); assertDoesNotThrow(() -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java index 65c4b1d4aba..954619f6174 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java @@ -29,6 +29,8 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -42,6 +44,8 @@ public class TestM3Metrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + @Mock + StorageConfiguration storageConf; HoodieMetrics hoodieMetrics; Metrics metrics; @@ -62,7 +66,7 @@ public class TestM3Metrics { when(metricsConfig.getM3Service()).thenReturn("hoodie"); when(metricsConfig.getM3Tags()).thenReturn("tag1=value1,tag2=value2"); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("metric1", 123L); assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); @@ -80,7 +84,7 @@ public class TestM3Metrics { when(metricsConfig.getM3Service()).thenReturn("hoodie"); when(metricsConfig.getM3Tags()).thenReturn(""); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("metric1", 123L); assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); @@ -94,7 +98,7 @@ public class TestM3Metrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); assertThrows(RuntimeException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); }); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java index 9ad2b8388a2..d95614a577a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java @@ -18,11 +18,13 @@ package org.apache.hudi.metrics.prometheus; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -42,6 +44,7 @@ public class TestPrometheusReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -60,8 +63,8 @@ public class TestPrometheusReporter { when(metricsConfig.getPrometheusPort()).thenReturn(9090); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); assertDoesNotThrow(() -> { - new HoodieMetrics(writeConfig); - hoodieMetrics = new HoodieMetrics(writeConfig); + new HoodieMetrics(writeConfig, storageConf); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java index aa1c3f06b6f..c2c7695932d 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java @@ -18,6 +18,7 @@ package org.apache.hudi.metrics.prometheus; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; @@ -25,6 +26,7 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.MetricUtils; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -34,15 +36,15 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.ArrayList; -import java.util.Map; -import java.util.UUID; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -60,6 +62,7 @@ public class TestPushGateWayReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -78,7 +81,7 @@ public class TestPushGateWayReporter { configureDefaultReporter(); assertDoesNotThrow(() -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); @@ -103,7 +106,7 @@ public class TestPushGateWayReporter { when(metricsConfig.getMetricReporterFileBasedConfigs()).thenReturn(propPrometheusPath + "," + propDatadogPath); } - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); Map<String, Long> metricsMap = new HashMap<>(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 8c994a463d1..5bf73306efb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -89,7 +89,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad protected void initRegistry() { if (metadataWriteConfig.isMetricsOn()) { // should support executor metrics - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index 52ccf509b7a..2f6e859e72c 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -79,7 +79,7 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada @Override protected void initRegistry() { if (metadataWriteConfig.isMetricsOn()) { - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 3c049dc9c20..2a5b6e33171 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -2393,7 +2393,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { assertNoWriteErrors(writeStatuses); validateMetadata(client); - Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); assertTrue((Long) metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count").getValue() >= 1L); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java index 5288963e33b..25fae3cb6f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java @@ -59,7 +59,7 @@ public abstract class SparkPreCommitValidator<T, I, K, O extends HoodieData<Writ this.table = table; this.engineContext = engineContext; this.writeConfig = writeConfig; - this.metrics = new HoodieMetrics(writeConfig); + this.metrics = new HoodieMetrics(writeConfig, engineContext.getStorageConf()); } protected Set<String> getPartitionsModified(HoodieWriteMetadata<O> writeResult) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index b6067d30c34..f5e7165a872 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -119,7 +119,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad } else { registry = Registry.getRegistry("HoodieMetadata"); } - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index b655cbc2ab5..cfe4fbad4c3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -3114,7 +3114,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertNoWriteErrors(writeStatuses); validateMetadata(client); - Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); assertTrue((Long) metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count").getValue() >= 1L); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 49351b463c2..4f76764ce98 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -1000,7 +1000,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { .build()) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).forTable("test-trip-table").build(); - HoodieMetrics metrics = new HoodieMetrics(cfg); + HoodieMetrics metrics = new HoodieMetrics(cfg, storageConf); BaseHoodieWriteClient client = getHoodieWriteClient(cfg); client.archive(); assertTrue(metrics.getMetrics().getRegistry().getNames().contains(metrics.getMetricsName(ARCHIVE_ACTION, DURATION_STR))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index db5db422d1d..a3aece81205 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -147,7 +147,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { // bytes for header byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); // content bytes - byte[] content = block.getContentBytes(); + byte[] content = block.getContentBytes(storage.getConf()); // bytes for footer byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 5ba9e1906b8..8247273aab0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -100,7 +101,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index deeb903cd18..a519f80eb40 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import java.util.HashMap; import java.util.Map; @@ -61,7 +62,7 @@ public class HoodieCommandBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) { return new byte[0]; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java index 19d704c2595..74502ee1b8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import java.io.IOException; import java.util.Map; @@ -38,7 +39,7 @@ public class HoodieCorruptBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { if (!getContent().isPresent() && readBlockLazily) { // read content from disk inflate(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 1b024a3b530..0819a5f8e53 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.slf4j.Logger; @@ -134,7 +135,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { // In case this method is called before realizing records from content Option<byte[]> content = getContent(); @@ -144,7 +145,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { return content.get(); } - return serializeRecords(records.get()); + return serializeRecords(records.get(), storageConf); } public String getKeyFieldName() { @@ -285,7 +286,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { ); } - protected abstract byte[] serializeRecords(List<HoodieRecord> records) throws IOException; + protected abstract byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException; protected abstract <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] content, HoodieRecordType type) throws IOException; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index a55f4f1e623..835fed9d44b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.util.Lazy; import org.apache.avro.io.BinaryDecoder; @@ -112,7 +113,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { Option<byte[]> content = getContent(); // In case this method is called before realizing keys from content diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 0893637b956..e63a1f9872a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -118,14 +118,14 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { HFileContext context = new HFileContextBuilder() .withBlockSize(DEFAULT_BLOCK_SIZE) .withCompression(compressionAlgorithm.get()) .withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME)) .build(); - Configuration conf = new Configuration(); + Configuration conf = storageConf.unwrapAs(Configuration.class); CacheConfig cacheConfig = new CacheConfig(conf); ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream ostream = new FSDataOutputStream(baos, null); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 25e90645699..f04b7735997 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -88,7 +88,7 @@ public abstract class HoodieLogBlock { } // Return the bytes representation of the data belonging to a LogBlock - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { throw new HoodieException("No implementation was provided"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 6c2e6802769..2997390dc34 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -29,13 +29,11 @@ import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; -import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -100,7 +98,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { if (records.size() == 0) { return new byte[0]; } @@ -116,8 +114,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); HoodieRecordType recordType = records.iterator().next().getRecordType(); try (HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()), - config, writerSchema, recordType)) { + HoodieFileFormat.PARQUET, outputStream, storageConf, config, writerSchema, recordType)) { for (HoodieRecord<?> record : records) { String recordKey = getRecordKey(record).orElse(null); parquetWriter.write(recordKey, record, writerSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 09f06da604b..31c6188184a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -93,7 +93,8 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { this.isMetadataTableInitialized = dataMetaClient.getTableConfig().isMetadataTableAvailable(); if (metadataConfig.isMetricsEnabled()) { - this.metrics = Option.of(new HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder().fromProperties(metadataConfig.getProps()).build())); + this.metrics = Option.of(new HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder() + .fromProperties(metadataConfig.getProps()).build(), getStorageConf())); } else { this.metrics = Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java index 970ad0743f4..fce32753883 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metrics.HoodieGauge; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.MetricRegistry; import org.slf4j.Logger; @@ -80,8 +81,8 @@ public class HoodieMetadataMetrics implements Serializable { private final transient MetricRegistry metricsRegistry; private final transient Metrics metrics; - public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig) { - this.metrics = Metrics.getInstance(metricsConfig); + public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig, StorageConfiguration<?> storageConf) { + this.metrics = Metrics.getInstance(metricsConfig, storageConf); this.metricsRegistry = metrics.getRegistry(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java index af32248eea1..cc50d3a4147 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -25,10 +25,10 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import com.codahale.metrics.MetricRegistry; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +53,10 @@ public class Metrics { private final String basePath; private boolean initialized = false; private transient Thread shutdownThread = null; + private final StorageConfiguration<?> storageConf; - public Metrics(HoodieMetricsConfig metricConfig) { + public Metrics(HoodieMetricsConfig metricConfig, StorageConfiguration<?> storageConf) { + this.storageConf = storageConf; registry = new MetricRegistry(); commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix(); reporters = new ArrayList<>(); @@ -78,13 +80,13 @@ public class Metrics { registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix)); } - public static synchronized Metrics getInstance(HoodieMetricsConfig metricConfig) { + public static synchronized Metrics getInstance(HoodieMetricsConfig metricConfig, StorageConfiguration<?> storageConf) { String basePath = getBasePath(metricConfig); if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) { return METRICS_INSTANCE_PER_BASEPATH.get(basePath); } - Metrics metrics = new Metrics(metricConfig); + Metrics metrics = new Metrics(metricConfig, storageConf); METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics); return metrics; } @@ -98,7 +100,7 @@ public class Metrics { private List<MetricsReporter> addAdditionalMetricsExporters(HoodieMetricsConfig metricConfig) { List<MetricsReporter> reporterList = new ArrayList<>(); List<String> propPathList = StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ","); - try (HoodieStorage storage = HoodieStorageUtils.getStorage(propPathList.get(0), new Configuration())) { + try (HoodieStorage storage = HoodieStorageUtils.getStorage(propPathList.get(0), storageConf)) { for (String propPath : propPathList) { HoodieMetricsConfig secondarySourceConfig = HoodieMetricsConfig.newBuilder().fromInputStream( storage.open(new StoragePath(propPath))).withPath(metricConfig.getBasePath()).build(); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 367dc506b21..84d329e4b9d 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -455,7 +455,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(); + byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(storage.getConf()); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation( HoodieTestUtils.getDefaultStorageConfWithDefaults(), null, 0, dataBlockContentBytes.length, 0); HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java similarity index 97% rename from hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java index 7441f8cdd41..a6bc014b3f4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -124,7 +125,7 @@ public class TestHoodieDeleteBlock { deleteRecordList.add(Pair.of(dr, -1L)); } HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, false, new HashMap<>()); - byte[] contentBytes = deleteBlock.getContentBytes(); + byte[] contentBytes = deleteBlock.getContentBytes(HoodieTestUtils.getDefaultStorageConf()); HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock( Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(), new HashMap<>()); DeleteRecord[] deserializedDeleteRecords = deserializeDeleteBlock.getRecordsToDelete(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala index c7e3110b6cd..07b4992dbc8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala @@ -51,7 +51,7 @@ class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with ProcedureBu def outputType: StructType = OUTPUT_TYPE def loadNewProps(filePath: String, props: Properties):Unit = { - val fs = HadoopFSUtils.getFs(filePath, new Configuration()) + val fs = HadoopFSUtils.getFs(filePath, spark.sessionState.newHadoopConf()) val fis = fs.open(new Path(filePath)) props.load(fis) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java index 6509e8d7e0c..11213b56e26 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java @@ -25,12 +25,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.timeline.service.handlers.MarkerHandler; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +93,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, storage); HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConf(new Configuration())).setBasePath(basePath) + HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(basePath) .setLoadActiveTimelineOnLoad(true).build(); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -104,7 +102,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { storage, basePath); Set<String> tableMarkers = candidate.stream().flatMap(instant -> { return MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(instant, storage, - new HoodieLocalEngineContext(HadoopFSUtils.getStorageConf(new Configuration())), 100) + new HoodieLocalEngineContext(storage.getConf().newInstance()), 100) .values().stream().flatMap(Collection::stream); }).collect(Collectors.toSet()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index cd7867edf3e..1dd008da237 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics; /** @@ -30,11 +31,11 @@ import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics; @Deprecated public class HoodieDeltaStreamerMetrics extends HoodieStreamerMetrics { - public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig) { - super(writeConfig.getMetricsConfig()); + public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + super(writeConfig.getMetricsConfig(), storageConf); } - public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig) { - super(metricsConfig); + public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig, StorageConfiguration<?> storageConf) { + super(metricsConfig, storageConf); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java index 3d07610993d..eb9b51aedb3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.ingestion; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Timer; @@ -30,14 +31,17 @@ import java.io.Serializable; */ public abstract class HoodieIngestionMetrics implements Serializable { + protected final StorageConfiguration<?> storageConf; + protected final HoodieMetricsConfig writeConfig; - public HoodieIngestionMetrics(HoodieWriteConfig writeConfig) { - this(writeConfig.getMetricsConfig()); + public HoodieIngestionMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + this(writeConfig.getMetricsConfig(), storageConf); } - public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig) { + public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig, StorageConfiguration<?> storageConf) { this.writeConfig = writeConfig; + this.storageConf = storageConf; } public abstract Timer.Context getOverallTimerContext(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java index fcbf431ed6f..ab1f72185a3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java @@ -22,6 +22,7 @@ package org.apache.hudi.utilities.streamer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import com.codahale.metrics.Timer; @@ -37,14 +38,14 @@ public class HoodieStreamerMetrics extends HoodieIngestionMetrics { private transient Timer hiveSyncTimer; private transient Timer metaSyncTimer; - public HoodieStreamerMetrics(HoodieWriteConfig writeConfig) { - this(writeConfig.getMetricsConfig()); + public HoodieStreamerMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + this(writeConfig.getMetricsConfig(), storageConf); } - public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig) { - super(writeConfig); + public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig, StorageConfiguration<?> storageConf) { + super(writeConfig, storageConf); if (writeConfig.isMetricsOn()) { - metrics = Metrics.getInstance(writeConfig); + metrics = Metrics.getInstance(writeConfig, storageConf); this.overallTimerName = getMetricsName("timer", "deltastreamer"); this.hiveSyncTimerName = getMetricsName("timer", "deltastreamerHiveSync"); this.metaSyncTimerName = getMetricsName("timer", "deltastreamerMetaSync"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 27231caaa5d..74a05242313 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -63,6 +63,7 @@ import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetaSyncException; @@ -75,6 +76,7 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.util.SyncUtilHelpers; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -310,8 +312,10 @@ public class StreamSync implements Serializable, Closeable { this.conf = conf; HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig(); - this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, hoodieWriteConfig.getMetricsConfig()); - this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig); + this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, + new Class<?>[] { HoodieMetricsConfig.class, StorageConfiguration.class}, + hoodieWriteConfig.getMetricsConfig(), storage.getConf()); + this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, storage.getConf()); if (props.getBoolean(ERROR_TABLE_ENABLED.key(), ERROR_TABLE_ENABLED.defaultValue())) { this.errorTableWriter = ErrorTableUtils.getErrorTableWriter( cfg, sparkSession, props, hoodieSparkContext, storage);