This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6911abf1755 [MINOR] Streamer test setup performance (#10806)
6911abf1755 is described below

commit 6911abf1755c5752599e30a623cfff56f3e4ffed
Author: Tim Brown <t...@onehouse.ai>
AuthorDate: Fri Apr 26 12:33:43 2024 -0700

    [MINOR] Streamer test setup performance (#10806)
---
 .../apache/hudi/common/testutils/RawTripTestPayload.java  |  7 +++----
 .../org/apache/hudi/common/testutils/SchemaTestUtil.java  |  4 ++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java        | 10 ++++------
 .../utilities/deltastreamer/TestHoodieDeltaStreamer.java  | 10 ++++++++--
 .../hudi/utilities/testutils/UtilitiesTestBase.java       | 15 ++++++++++++++-
 5 files changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index de262ce0d64..3ec4901823a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -63,6 +63,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
   public static final String JSON_DATA_SCHEMA_STR = 
"{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"default\":null},"
       + 
"{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"}]}";
   public static final Schema JSON_DATA_SCHEMA = new 
Schema.Parser().parse(JSON_DATA_SCHEMA_STR);
+  private static final MercifulJsonConverter JSON_CONVERTER = new 
MercifulJsonConverter();
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private String partitionPath;
@@ -206,8 +207,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
     if (isDeleted) {
       return Option.empty();
     } else {
-      MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
-      return Option.of(jsonConverter.convert(getJsonData(), schema));
+      return Option.of(JSON_CONVERTER.convert(getJsonData(), schema));
     }
   }
 
@@ -217,8 +217,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
   }
 
   public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
-    MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
-    return jsonConverter.convert(getJsonData(), schema);
+    return JSON_CONVERTER.convert(getJsonData(), schema);
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 9ee16174973..3e16a18c282 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -67,6 +67,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudo
 public final class SchemaTestUtil {
 
   private static final String RESOURCE_SAMPLE_DATA = "/sample.data";
+  private static final MercifulJsonConverter CONVERTER = new 
MercifulJsonConverter();
 
   private final Random random = new Random(0xDEED);
 
@@ -267,8 +268,7 @@ public final class SchemaTestUtil {
   public static GenericRecord generateAvroRecordFromJson(Schema schema, int 
recordNumber, String instantTime,
       String fileId, boolean populateMetaFields) throws IOException {
     SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber, 
fileId, populateMetaFields);
-    MercifulJsonConverter converter = new MercifulJsonConverter();
-    return converter.convert(record.toJsonString(), schema);
+    return CONVERTER.convert(record.toJsonString(), schema);
   }
 
   public static Schema getSchemaFromResource(Class<?> clazz, String name, 
boolean withHoodieMetadata) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 05df4017305..433549f56b4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -137,8 +137,6 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     testUtils.setup();
     topicName = "topic" + testNum;
     prepareInitialConfigs(storage, basePath, testUtils.brokerAddress());
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
-    prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
   }
 
   @AfterEach
@@ -156,9 +154,9 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices(false, true, false);
     // basePath is defined in UtilitiesTestBase.initTestServices
-    PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
-    ORC_SOURCE_ROOT = basePath + "/orcFiles";
-    JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
+    PARQUET_SOURCE_ROOT = basePath + "parquetFiles";
+    ORC_SOURCE_ROOT = basePath + "orcFiles";
+    JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles";
   }
 
   @AfterAll
@@ -677,7 +675,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
             Thread.sleep(2000);
             ret = condition.apply(true);
           } catch (Throwable error) {
-            LOG.warn("Got error :", error);
+            LOG.debug("Got error waiting for condition", error);
             ret = false;
           }
         }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index b0a9f14aa21..ff1e5ae2981 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -147,6 +147,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -717,7 +718,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, 
HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String 
jobId) throws Exception {
-    Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future dsFuture = executor.submit(() -> {
       try {
         ds.sync();
       } catch (Exception ex) {
@@ -732,6 +734,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       ds.shutdownGracefully();
       dsFuture.get();
     }
+    executor.shutdown();
   }
 
   static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws 
InterruptedException {
@@ -1421,7 +1424,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
 
     // generate data asynchronously.
-    Future inputGenerationFuture = 
Executors.newSingleThreadExecutor().submit(() -> {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future inputGenerationFuture = executor.submit(() -> {
       try {
         int counter = 2;
         while (counter < 100) { // lets keep going. if the test times out, we 
will cancel the future within finally. So, safe to generate 100 batches.
@@ -1461,6 +1465,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       ds.shutdownGracefully();
       inputGenerationFuture.cancel(true);
       UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+      executor.shutdown();
     }
     testNum++;
   }
@@ -1797,6 +1802,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   private void testORCDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
     // prepare ORCDFSSource
+    prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
     TypedProperties orcProps = new TypedProperties();
 
     // Properties used for testing delta-streamer with orc source
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 175191436ad..e166e418d43 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -87,7 +87,9 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import scala.Tuple2;
@@ -163,7 +165,7 @@ public class UtilitiesTestBase {
       zookeeperTestService.start();
     }
 
-    jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + 
"-hoodie", "local[8]");
+    jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + 
"-hoodie", "local[4]", sparkConf());
     context = new HoodieSparkEngineContext(jsc);
     sqlContext = new SQLContext(jsc);
     sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
@@ -266,6 +268,17 @@ public class UtilitiesTestBase {
     TestDataSource.resetDataGen();
   }
 
+  private static Map<String, String> sparkConf() {
+    Map<String, String> conf = new HashMap<>();
+    conf.put("spark.default.parallelism", "2");
+    conf.put("spark.sql.shuffle.partitions", "2");
+    conf.put("spark.executor.memory", "1G");
+    conf.put("spark.driver.memory", "1G");
+    conf.put("spark.hadoop.mapred.output.compress", "true");
+    conf.put("spark.ui.enable", "false");
+    return conf;
+  }
+
   /**
    * Helper to get hive sync config.
    * 

Reply via email to