This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6911abf1755 [MINOR] Streamer test setup performance (#10806) 6911abf1755 is described below commit 6911abf1755c5752599e30a623cfff56f3e4ffed Author: Tim Brown <t...@onehouse.ai> AuthorDate: Fri Apr 26 12:33:43 2024 -0700 [MINOR] Streamer test setup performance (#10806) --- .../apache/hudi/common/testutils/RawTripTestPayload.java | 7 +++---- .../org/apache/hudi/common/testutils/SchemaTestUtil.java | 4 ++-- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 10 ++++------ .../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 10 ++++++++-- .../hudi/utilities/testutils/UtilitiesTestBase.java | 15 ++++++++++++++- 5 files changed, 31 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index de262ce0d64..3ec4901823a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -63,6 +63,7 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa public static final String JSON_DATA_SCHEMA_STR = "{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"default\":null}," + "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"}]}"; public static final Schema JSON_DATA_SCHEMA = new Schema.Parser().parse(JSON_DATA_SCHEMA_STR); + private static final MercifulJsonConverter JSON_CONVERTER = new MercifulJsonConverter(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private String partitionPath; @@ -206,8 +207,7 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa if (isDeleted) { return Option.empty(); } else { - MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return Option.of(jsonConverter.convert(getJsonData(), schema)); + return Option.of(JSON_CONVERTER.convert(getJsonData(), schema)); } } @@ -217,8 +217,7 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa } public IndexedRecord getRecordToInsert(Schema schema) throws IOException { - MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return jsonConverter.convert(getJsonData(), schema); + return JSON_CONVERTER.convert(getJsonData(), schema); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index 9ee16174973..3e16a18c282 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -67,6 +67,7 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudo public final class SchemaTestUtil { private static final String RESOURCE_SAMPLE_DATA = "/sample.data"; + private static final MercifulJsonConverter CONVERTER = new MercifulJsonConverter(); private final Random random = new Random(0xDEED); @@ -267,8 +268,7 @@ public final class SchemaTestUtil { public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String instantTime, String fileId, boolean populateMetaFields) throws IOException { SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber, fileId, populateMetaFields); - MercifulJsonConverter converter = new MercifulJsonConverter(); - return converter.convert(record.toJsonString(), schema); + return CONVERTER.convert(record.toJsonString(), schema); } public static Schema getSchemaFromResource(Class<?> clazz, String name, boolean withHoodieMetadata) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 05df4017305..433549f56b4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -137,8 +137,6 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { testUtils.setup(); topicName = "topic" + testNum; prepareInitialConfigs(storage, basePath, testUtils.brokerAddress()); - prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); - prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); } @AfterEach @@ -156,9 +154,9 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(false, true, false); // basePath is defined in UtilitiesTestBase.initTestServices - PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; - ORC_SOURCE_ROOT = basePath + "/orcFiles"; - JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles"; + PARQUET_SOURCE_ROOT = basePath + "parquetFiles"; + ORC_SOURCE_ROOT = basePath + "orcFiles"; + JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles"; } @AfterAll @@ -677,7 +675,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { Thread.sleep(2000); ret = condition.apply(true); } catch (Throwable error) { - LOG.warn("Got error :", error); + LOG.debug("Got error waiting for condition", error); ret = false; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index b0a9f14aa21..ff1e5ae2981 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -147,6 +147,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -717,7 +718,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String jobId) throws Exception { - Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future dsFuture = executor.submit(() -> { try { ds.sync(); } catch (Exception ex) { @@ -732,6 +734,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { ds.shutdownGracefully(); dsFuture.get(); } + executor.shutdown(); } static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException { @@ -1421,7 +1424,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : ""); // generate data asynchronously. - Future inputGenerationFuture = Executors.newSingleThreadExecutor().submit(() -> { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future inputGenerationFuture = executor.submit(() -> { try { int counter = 2; while (counter < 100) { // lets keep going. if the test times out, we will cancel the future within finally. So, safe to generate 100 batches. @@ -1461,6 +1465,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { ds.shutdownGracefully(); inputGenerationFuture.cancel(true); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); + executor.shutdown(); } testNum++; } @@ -1797,6 +1802,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception { // prepare ORCDFSSource + prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); TypedProperties orcProps = new TypedProperties(); // Properties used for testing delta-streamer with orc source diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 175191436ad..e166e418d43 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -87,7 +87,9 @@ import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import scala.Tuple2; @@ -163,7 +165,7 @@ public class UtilitiesTestBase { zookeeperTestService.start(); } - jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[8]"); + jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[4]", sparkConf()); context = new HoodieSparkEngineContext(jsc); sqlContext = new SQLContext(jsc); sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); @@ -266,6 +268,17 @@ public class UtilitiesTestBase { TestDataSource.resetDataGen(); } + private static Map<String, String> sparkConf() { + Map<String, String> conf = new HashMap<>(); + conf.put("spark.default.parallelism", "2"); + conf.put("spark.sql.shuffle.partitions", "2"); + conf.put("spark.executor.memory", "1G"); + conf.put("spark.driver.memory", "1G"); + conf.put("spark.hadoop.mapred.output.compress", "true"); + conf.put("spark.ui.enable", "false"); + return conf; + } + /** * Helper to get hive sync config. *