This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6be7205a1e3 [MINOR] Refactored `@Before*` and `@After*` in `HoodieDeltaStreamerTestBase` (#10912) 6be7205a1e3 is described below commit 6be7205a1e3beefb7dc1dd5bc151a0edfa66182f Author: Geser Dugarov <geserduga...@gmail.com> AuthorDate: Sat Mar 23 07:56:58 2024 +0700 [MINOR] Refactored `@Before*` and `@After*` in `HoodieDeltaStreamerTestBase` (#10912) --- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 110 ++++++++++----------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 8f9fc6f1c50..3c74388860e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -26,11 +27,15 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.hive.HiveSyncConfigHolder; import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.hive.testutils.HiveTestService; +import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.config.KafkaSourceConfig; import org.apache.hudi.utilities.config.SourceTestConfig; @@ -38,6 +43,7 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; +import org.apache.hudi.utilities.streamer.HoodieStreamer; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; @@ -68,17 +74,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static org.apache.hudi.common.config.HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS; -import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata; -import static org.apache.hudi.common.util.StringUtils.nonEmpty; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; -import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; -import static org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS; -import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -137,9 +132,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { @BeforeEach protected void prepareTestSetup() throws IOException { - PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; - ORC_SOURCE_ROOT = basePath + "/orcFiles"; - JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles"; + setupTest(); testUtils = new KafkaTestUtils(); testUtils.setup(); topicName = "topic" + testNum; @@ -148,6 +141,36 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); } + @AfterEach + public void cleanupKafkaTestUtils() { + if (testUtils != null) { + testUtils.teardown(); + testUtils = null; + } + if (hudiOpts != null) { + hudiOpts = null; + } + } + + @BeforeAll + public static void initClass() throws Exception { + UtilitiesTestBase.initTestServices(false, true, false); + // basePath is defined in UtilitiesTestBase.initTestServices + PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; + ORC_SOURCE_ROOT = basePath + "/orcFiles"; + JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles"; + } + + @AfterAll + public static void tearDown() { + UtilitiesTestBase.cleanUpUtilitiesTestServices(); + } + + public void setupTest() { + TestDataSource.returnEmptyBatch = false; + hudiOpts = new HashMap<>(); + } + protected static void prepareInitialConfigs(FileSystem dfs, String dfsBasePath, String brokerAddress) throws IOException { // prepare the configs. UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); @@ -232,38 +255,15 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); // Hive Configs - props.setProperty(HIVE_URL.key(), HS2_JDBC_URL); - props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1"); - props.setProperty(META_SYNC_TABLE_NAME.key(), "hive_trips"); - props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); - props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), + props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL); + props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1"); + props.setProperty(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "hive_trips"); + props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr"); + props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName()); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); } - @BeforeAll - public static void initClass() throws Exception { - UtilitiesTestBase.initTestServices(false, true, false); - } - - @AfterAll - public static void tearDown() throws IOException { - UtilitiesTestBase.cleanUpUtilitiesTestServices(); - } - - @AfterEach - public void cleanupKafkaTestUtils() { - if (testUtils != null) { - testUtils.teardown(); - } - } - - @BeforeEach - public void setupTest() { - TestDataSource.returnEmptyBatch = false; - hudiOpts = new HashMap<>(); - } - protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); @@ -296,10 +296,10 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { protected static void populateCommonHiveProps(TypedProperties props) { // Hive Configs - props.setProperty(HIVE_URL.key(), HS2_JDBC_URL); - props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb2"); - props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); - props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), + props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL); + props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb2"); + props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr"); + props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName()); } @@ -408,7 +408,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty(KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(), ByteArrayDeserializer.class.getName()); + props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(), ByteArrayDeserializer.class.getName()); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue())); @@ -442,19 +442,19 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List<String> configs = new ArrayList<>(); configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords)); - if (nonEmpty(autoClean)) { + if (StringUtils.nonEmpty(autoClean)) { configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean)); } - if (nonEmpty(inlineCluster)) { + if (StringUtils.nonEmpty(inlineCluster)) { configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster)); } - if (nonEmpty(inlineClusterMaxCommit)) { + if (StringUtils.nonEmpty(inlineClusterMaxCommit)) { configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), inlineClusterMaxCommit)); } - if (nonEmpty(asyncCluster)) { + if (StringUtils.nonEmpty(asyncCluster)) { configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), asyncCluster)); } - if (nonEmpty(asyncClusterMaxCommit)) { + if (StringUtils.nonEmpty(asyncClusterMaxCommit)) { configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), asyncClusterMaxCommit)); } return configs; @@ -482,7 +482,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime)); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime), - serializeCommitMetadata(commitMetadata)); + TimelineMetadataUtils.serializeCommitMetadata(commitMetadata)); } void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { @@ -616,7 +616,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { cfg.schemaProviderClassName = schemaProviderClassName; } List<String> cfgs = new ArrayList<>(); - cfgs.add(SET_NULL_FOR_MISSING_COLUMNS.key() + "=true"); + cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() + "=true"); cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); // No partition @@ -665,7 +665,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class); assertEquals(totalCommits, timeline.countInstants()); - assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY)); + assertEquals(expected, commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY)); return lastInstant.getTimestamp(); }