This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6be7205a1e3 [MINOR] Refactored `@Before*` and `@After*` in 
`HoodieDeltaStreamerTestBase` (#10912)
6be7205a1e3 is described below

commit 6be7205a1e3beefb7dc1dd5bc151a0edfa66182f
Author: Geser Dugarov <geserduga...@gmail.com>
AuthorDate: Sat Mar 23 07:56:58 2024 +0700

    [MINOR] Refactored `@Before*` and `@After*` in 
`HoodieDeltaStreamerTestBase` (#10912)
---
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 110 ++++++++++-----------
 1 file changed, 55 insertions(+), 55 deletions(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 8f9fc6f1c50..3c74388860e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -26,11 +27,15 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.hive.HiveSyncConfigHolder;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.hive.testutils.HiveTestService;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.SourceTestConfig;
@@ -38,6 +43,7 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
+import org.apache.hudi.utilities.streamer.HoodieStreamer;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.Schema;
@@ -68,17 +74,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS;
-import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static org.apache.hudi.common.util.StringUtils.nonEmpty;
-import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
-import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL;
-import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
-import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
-import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
-import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
-import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
-import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -137,9 +132,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
   @BeforeEach
   protected void prepareTestSetup() throws IOException {
-    PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
-    ORC_SOURCE_ROOT = basePath + "/orcFiles";
-    JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
+    setupTest();
     testUtils = new KafkaTestUtils();
     testUtils.setup();
     topicName = "topic" + testNum;
@@ -148,6 +141,36 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
   }
 
+  @AfterEach
+  public void cleanupKafkaTestUtils() {
+    if (testUtils != null) {
+      testUtils.teardown();
+      testUtils = null;
+    }
+    if (hudiOpts != null) {
+      hudiOpts = null;
+    }
+  }
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initTestServices(false, true, false);
+    // basePath is defined in UtilitiesTestBase.initTestServices
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
+    ORC_SOURCE_ROOT = basePath + "/orcFiles";
+    JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
+  }
+
+  public void setupTest() {
+    TestDataSource.returnEmptyBatch = false;
+    hudiOpts = new HashMap<>();
+  }
+
   protected static void prepareInitialConfigs(FileSystem dfs, String 
dfsBasePath, String brokerAddress) throws IOException {
     // prepare the configs.
     UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
@@ -232,38 +255,15 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
 
     // Hive Configs
-    props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
-    props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
-    props.setProperty(META_SYNC_TABLE_NAME.key(), "hive_trips");
-    props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
-    props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+    props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), 
HiveTestService.HS2_JDBC_URL);
+    props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), 
"testdb1");
+    props.setProperty(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), 
"hive_trips");
+    props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), 
"datestr");
+    
props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
         MultiPartKeysValueExtractor.class.getName());
     UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE);
   }
 
-  @BeforeAll
-  public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, true, false);
-  }
-
-  @AfterAll
-  public static void tearDown() throws IOException {
-    UtilitiesTestBase.cleanUpUtilitiesTestServices();
-  }
-
-  @AfterEach
-  public void cleanupKafkaTestUtils() {
-    if (testUtils != null) {
-      testUtils.teardown();
-    }
-  }
-
-  @BeforeEach
-  public void setupTest() {
-    TestDataSource.returnEmptyBatch = false;
-    hudiOpts = new HashMap<>();
-  }
-
   protected static void 
populateInvalidTableConfigFilePathProps(TypedProperties props, String 
dfsBasePath) {
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
     
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
@@ -296,10 +296,10 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
   protected static void populateCommonHiveProps(TypedProperties props) {
     // Hive Configs
-    props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
-    props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb2");
-    props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
-    props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+    props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), 
HiveTestService.HS2_JDBC_URL);
+    props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), 
"testdb2");
+    props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), 
"datestr");
+    
props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
         MultiPartKeysValueExtractor.class.getName());
   }
 
@@ -408,7 +408,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    props.setProperty(KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(),  
ByteArrayDeserializer.class.getName());
+    
props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(),  
ByteArrayDeserializer.class.getName());
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
@@ -442,19 +442,19 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
                                               String inlineClusterMaxCommit, 
String asyncCluster, String asyncClusterMaxCommit) {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
-    if (nonEmpty(autoClean)) {
+    if (StringUtils.nonEmpty(autoClean)) {
       configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), 
autoClean));
     }
-    if (nonEmpty(inlineCluster)) {
+    if (StringUtils.nonEmpty(inlineCluster)) {
       configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
     }
-    if (nonEmpty(inlineClusterMaxCommit)) {
+    if (StringUtils.nonEmpty(inlineClusterMaxCommit)) {
       configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 
inlineClusterMaxCommit));
     }
-    if (nonEmpty(asyncCluster)) {
+    if (StringUtils.nonEmpty(asyncCluster)) {
       configs.add(String.format("%s=%s", 
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), asyncCluster));
     }
-    if (nonEmpty(asyncClusterMaxCommit)) {
+    if (StringUtils.nonEmpty(asyncClusterMaxCommit)) {
       configs.add(String.format("%s=%s", 
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), 
asyncClusterMaxCommit));
     }
     return configs;
@@ -482,7 +482,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime));
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, 
commitTime),
-        serializeCommitMetadata(commitMetadata));
+        TimelineMetadataUtils.serializeCommitMetadata(commitMetadata));
   }
 
   void assertRecordCount(long expected, String tablePath, SQLContext 
sqlContext) {
@@ -616,7 +616,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
         cfg.schemaProviderClassName = schemaProviderClassName;
       }
       List<String> cfgs = new ArrayList<>();
-      cfgs.add(SET_NULL_FOR_MISSING_COLUMNS.key() + "=true");
+      cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() + 
"=true");
       
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" 
+ addReadLatestOnMissingCkpt);
       cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
       // No partition
@@ -665,7 +665,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       HoodieCommitMetadata commitMetadata =
           
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), 
HoodieCommitMetadata.class);
       assertEquals(totalCommits, timeline.countInstants());
-      assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY));
+      assertEquals(expected, 
commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
       return lastInstant.getTimestamp();
     }
 

Reply via email to