This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 617cc24 [HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (#2227) 617cc24 is described below commit 617cc24ad1a28196b872df5663e9e0f48cd7f0fa Author: liujinhui <965147...@qq.com> AuthorDate: Thu Feb 25 20:08:13 2021 +0800 [HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (#2227) Co-authored-by: Sivabalan Narayanan <sivab...@uber.com> --- .../utilities/sources/helpers/KafkaOffsetGen.java | 52 ++++++- .../functional/TestHoodieDeltaStreamer.java | 151 +++++++++++++++++++-- .../TestHoodieMultiTableDeltaStreamer.java | 21 +-- .../hudi/utilities/sources/TestKafkaSource.java | 2 +- .../utilities/testutils/UtilitiesTestBase.java | 17 ++- 5 files changed, 213 insertions(+), 30 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index fc7ba79..e37ec0a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -40,6 +41,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -49,6 +52,12 @@ public class KafkaOffsetGen { private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); + /** + * kafka checkpoint Pattern. + * Format: topic_name,partition_num:offset,partition_num:offset,.... + */ + private final Pattern pattern = Pattern.compile(".*,.*:.*"); + public static class CheckpointUtils { /** @@ -148,7 +157,8 @@ public class KafkaOffsetGen { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST; + private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets"; + private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST; public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @@ -156,15 +166,29 @@ public class KafkaOffsetGen { private final HashMap<String, Object> kafkaParams; private final TypedProperties props; protected final String topicName; + private KafkaResetOffsetStrategies autoResetValue; public KafkaOffsetGen(TypedProperties props) { this.props = props; + kafkaParams = new HashMap<>(); for (Object prop : props.keySet()) { kafkaParams.put(prop.toString(), props.get(prop.toString())); } DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); + String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name()); + boolean found = false; + for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) { + if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) { + found = true; + autoResetValue = entry; + break; + } + } + if (!found) { + throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr); + } } public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { @@ -186,8 +210,6 @@ public class KafkaOffsetGen { fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { - KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies - .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); switch (autoResetValue) { case EARLIEST: fromOffsets = consumer.beginningOffsets(topicPartitions); @@ -227,12 +249,23 @@ public class KafkaOffsetGen { // else return earliest offsets private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) { - Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); + if (checkTopicCheckpoint(lastCheckpointStr)) { + Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() + .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); + return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; + } + + switch (autoResetValue) { + case EARLIEST: + return earliestOffsets; + case LATEST: + return consumer.endOffsets(topicPartitions); + default: + throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' "); + } - boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() - .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); - return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) { @@ -257,6 +290,11 @@ public class KafkaOffsetGen { return result.containsKey(topicName); } + private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) { + Matcher matcher = pattern.matcher(lastCheckpointStr.get()); + return matcher.matches(); + } + public String getTopicName() { return topicName; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 616d039..7fb5b18 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -48,6 +48,7 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -56,10 +57,12 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; private static String PARQUET_SOURCE_ROOT; + private static String JSON_KAFKA_SOURCE_ROOT; private static final int PARQUET_NUM_RECORDS = 5; private static final int CSV_NUM_RECORDS = 3; + private static final int JSON_KAFKA_NUM_RECORDS = 5; // Required fields private static final String TGT_BASE_PATH_PARAM = "--target-base-path"; private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; @@ -136,15 +142,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); public static KafkaTestUtils testUtils; + protected static String topicName; - private static int testNum = 1; + protected static int testNum = 1; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initClass(true); PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; testUtils = new KafkaTestUtils(); testUtils.setup(); + topicName = "topic" + testNum; // prepare the configs. UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); @@ -236,7 +245,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); @@ -966,27 +975,56 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } private static void prepareParquetDFSFiles(int numRecords) throws IOException { - String path = PARQUET_SOURCE_ROOT + "/1.parquet"; + prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null); + } + + private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema, + String schemaStr, Schema schema) throws IOException { + String path = PARQUET_SOURCE_ROOT + "/" + fileName; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - Helpers.saveParquetToDFS(Helpers.toGenericRecords( - dataGenerator.generateInserts("000", numRecords)), new Path(path)); + if (useCustomSchema) { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), + schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + } else { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInserts("000", numRecords)), new Path(path)); + } + } + + private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException { + if (createTopic) { + try { + testUtils.createTopic(topicName, 2); + } catch (TopicExistsException e) { + // no op + } + } + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); parquetProps.setProperty("include", "base.properties"); + parquetProps.setProperty("hoodie.embed.timeline.server","false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); if (useSchemaProvider) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile); if (hasTransformer) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile); } } - parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); - - UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); + parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName); } private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception { @@ -1001,6 +1039,99 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { testNum++; } + private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException { + // Properties used for testing delta-streamer with JsonKafka source + TypedProperties props = new TypedProperties(); + populateCommonProps(props); + props.setProperty("include", "base.properties"); + props.setProperty("hoodie.embed.timeline.server","false"); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); + props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue); + + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); + } + + /** + * Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource. + * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST) + * @throws Exception + */ + private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception { + // prep parquet source + PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum; + int parquetRecords = 10; + prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + + prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT); + // delta streamer w/ parquest source + String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext); + deltaStreamer.shutdownGracefully(); + + // prep json kafka source + topicName = "topic" + testNum; + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName); + // delta streamer w/ json kafka source + deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + // if auto reset value is set to LATEST, this all kafka records so far may not be synced. + int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + + // verify 2nd batch to test LATEST auto reset value. + prepareJsonKafkaDFSFiles(20, false, topicName); + totalExpectedRecords += 20; + deltaStreamer.sync(); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + + @Test + public void testJsonKafkaDFSSource() throws Exception { + topicName = "topic" + testNum; + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName); + String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + + int totalRecords = JSON_KAFKA_NUM_RECORDS; + int records = 10; + totalRecords += records; + prepareJsonKafkaDFSFiles(records, false, topicName); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); + } + + @Test + public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception { + testDeltaStreamerTransitionFromParquetToKafkaSource(false); + } + + @Test + public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception { + testDeltaStreamerTransitionFromParquetToKafkaSource(true); + } + @Test public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception { testParquetDFSSource(false, null); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index a6f4edf..ad1b753 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -119,12 +119,14 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { @Test //0 corresponds to fg public void testMultiTableExecution() throws IOException { //create topics for each table - testUtils.createTopic("topic1", 2); - testUtils.createTopic("topic2", 2); + String topicName1 = "topic" + testNum++; + String topicName2 = "topic" + testNum; + testUtils.createTopic(topicName1, 2); + testUtils.createTopic(topicName2, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); - testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); @@ -132,21 +134,23 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { TypedProperties properties = executionContexts.get(1).getProperties(); properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); + properties.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName2); executionContexts.get(1).setProperties(properties); TypedProperties properties1 = executionContexts.get(0).getProperties(); properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc"); properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc"); + properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName1); executionContexts.get(0).setProperties(properties1); - String targetBasePath1 = executionContexts.get(1).getConfig().targetBasePath; - String targetBasePath2 = executionContexts.get(0).getConfig().targetBasePath; + String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath; + String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; streamer.sync(); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); //insert updates for already existing records in kafka topics - testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); - testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); streamer.sync(); assertEquals(2, streamer.getSuccessTables().size()); assertTrue(streamer.getFailedTables().isEmpty()); @@ -154,5 +158,6 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { //assert the record count matches now TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + testNum++; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index e8cb2a6..9004c66 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", resetStrategy); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", resetStrategy); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 0bbdb23..b83fa78 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -46,6 +46,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; @@ -279,8 +280,12 @@ public class UtilitiesTestBase { } public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException { + saveParquetToDFS(records, targetFile, HoodieTestDataGenerator.AVRO_SCHEMA); + } + + public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile, Schema schema) throws IOException { try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile) - .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA) + .withSchema(schema) .withConf(HoodieTestUtils.getDefaultHadoopConf()) .withWriteMode(Mode.OVERWRITE) .build()) { @@ -308,9 +313,9 @@ public class UtilitiesTestBase { return props; } - public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { + public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, Schema schema) { try { - Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); + Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(schema); return (GenericRecord) recordOpt.get(); } catch (IOException e) { return null; @@ -318,9 +323,13 @@ public class UtilitiesTestBase { } public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords) { + return toGenericRecords(hoodieRecords, HoodieTestDataGenerator.AVRO_SCHEMA); + } + + public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords, Schema schema) { List<GenericRecord> records = new ArrayList<>(); for (HoodieRecord hoodieRecord : hoodieRecords) { - records.add(toGenericRecord(hoodieRecord)); + records.add(toGenericRecord(hoodieRecord, schema)); } return records; }