This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 617cc24  [HUDI-1367] Make deltaStreamer transition from dfsSouce to 
kafkasouce (#2227)
617cc24 is described below

commit 617cc24ad1a28196b872df5663e9e0f48cd7f0fa
Author: liujinhui <965147...@qq.com>
AuthorDate: Thu Feb 25 20:08:13 2021 +0800

    [HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce 
(#2227)
    
    
    Co-authored-by: Sivabalan Narayanan <sivab...@uber.com>
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  52 ++++++-
 .../functional/TestHoodieDeltaStreamer.java        | 151 +++++++++++++++++++--
 .../TestHoodieMultiTableDeltaStreamer.java         |  21 +--
 .../hudi/utilities/sources/TestKafkaSource.java    |   2 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  17 ++-
 5 files changed, 213 insertions(+), 30 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index fc7ba79..e37ec0a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
@@ -40,6 +41,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -49,6 +52,12 @@ public class KafkaOffsetGen {
 
   private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
 
+  /**
+   * kafka checkpoint Pattern.
+   * Format: topic_name,partition_num:offset,partition_num:offset,....
+   */
+  private final Pattern pattern = Pattern.compile(".*,.*:.*");
+
   public static class CheckpointUtils {
 
     /**
@@ -148,7 +157,8 @@ public class KafkaOffsetGen {
 
     private static final String KAFKA_TOPIC_NAME = 
"hoodie.deltastreamer.source.kafka.topic";
     private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = 
"hoodie.deltastreamer.kafka.source.maxEvents";
-    private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET 
= KafkaResetOffsetStrategies.LATEST;
+    private static final String KAFKA_AUTO_RESET_OFFSETS = 
"hoodie.deltastreamer.source.kafka.auto.reset.offsets";
+    private static final KafkaResetOffsetStrategies 
DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
     public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
     public static long maxEventsFromKafkaSource = 
DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
   }
@@ -156,15 +166,29 @@ public class KafkaOffsetGen {
   private final HashMap<String, Object> kafkaParams;
   private final TypedProperties props;
   protected final String topicName;
+  private KafkaResetOffsetStrategies autoResetValue;
 
   public KafkaOffsetGen(TypedProperties props) {
     this.props = props;
+
     kafkaParams = new HashMap<>();
     for (Object prop : props.keySet()) {
       kafkaParams.put(prop.toString(), props.get(prop.toString()));
     }
     DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(Config.KAFKA_TOPIC_NAME));
     topicName = props.getString(Config.KAFKA_TOPIC_NAME);
+    String kafkaAutoResetOffsetsStr = 
props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, 
Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name());
+    boolean found = false;
+    for (KafkaResetOffsetStrategies entry: 
KafkaResetOffsetStrategies.values()) {
+      if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
+        found = true;
+        autoResetValue = entry;
+        break;
+      }
+    }
+    if (!found) {
+      throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + 
" config set to unknown value " + kafkaAutoResetOffsetsStr);
+    }
   }
 
   public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
@@ -186,8 +210,6 @@ public class KafkaOffsetGen {
         fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, 
topicPartitions);
         
metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr,
 topicPartitions, consumer));
       } else {
-        KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
-                .valueOf(props.getString("auto.offset.reset", 
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
         switch (autoResetValue) {
           case EARLIEST:
             fromOffsets = consumer.beginningOffsets(topicPartitions);
@@ -227,12 +249,23 @@ public class KafkaOffsetGen {
   // else return earliest offsets
   private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
                                                         Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
-    Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
     Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
+    if (checkTopicCheckpoint(lastCheckpointStr)) {
+      Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+      boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
+              .anyMatch(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()));
+      return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+    }
+
+    switch (autoResetValue) {
+      case EARLIEST:
+        return earliestOffsets;
+      case LATEST:
+        return consumer.endOffsets(topicPartitions);
+      default:
+        throw new HoodieNotSupportedException("Auto reset value must be one of 
'earliest' or 'latest' ");
+    }
 
-    boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
-            .anyMatch(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()));
-    return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
   }
 
   private Long delayOffsetCalculation(Option<String> lastCheckpointStr, 
Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
@@ -257,6 +290,11 @@ public class KafkaOffsetGen {
     return result.containsKey(topicName);
   }
 
+  private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
+    Matcher matcher = pattern.matcher(lastCheckpointStr.get());
+    return matcher.matches();
+  }
+
   public String getTopicName() {
     return topicName;
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 616d039..7fb5b18 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -48,6 +48,7 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.CsvDFSSource;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -56,10 +57,12 @@ import 
org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
 import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
   private static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
   private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+  private static final String PROPS_FILENAME_TEST_JSON_KAFKA = 
"test-json-kafka-dfs-source.properties";
   private static String PARQUET_SOURCE_ROOT;
+  private static String JSON_KAFKA_SOURCE_ROOT;
   private static final int PARQUET_NUM_RECORDS = 5;
   private static final int CSV_NUM_RECORDS = 3;
+  private static final int JSON_KAFKA_NUM_RECORDS = 5;
   // Required fields
   private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
   private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
@@ -136,15 +142,18 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private static final String HOODIE_CONF_VALUE2 = 
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
   public static KafkaTestUtils testUtils;
+  protected static String topicName;
 
-  private static int testNum = 1;
+  protected static int testNum = 1;
 
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initClass(true);
     PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+    JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
     testUtils = new KafkaTestUtils();
     testUtils.setup();
+    topicName = "topic" + testNum;
 
     // prepare the configs.
     
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
@@ -236,7 +245,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
     //Kafka source properties
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
-    props.setProperty("auto.offset.reset", "earliest");
+    props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", 
"earliest");
     props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
@@ -966,27 +975,56 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   }
 
   private static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
-    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+    prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
+  }
+
+  private static void prepareParquetDFSFiles(int numRecords, String fileName, 
boolean useCustomSchema,
+      String schemaStr, Schema schema) throws IOException {
+    String path = PARQUET_SOURCE_ROOT + "/" + fileName;
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-        dataGenerator.generateInserts("000", numRecords)), new Path(path));
+    if (useCustomSchema) {
+      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
+          schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+    } else {
+      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+          dataGenerator.generateInserts("000", numRecords)), new Path(path));
+    }
+  }
+
+  private static void prepareJsonKafkaDFSFiles(int numRecords, boolean 
createTopic, String topicName) throws IOException {
+    if (createTopic) {
+      try {
+        testUtils.createTopic(topicName, 2);
+      } catch (TopicExistsException e) {
+        // no op
+      }
+    }
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.sendMessages(topicName, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 
numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
   }
 
   private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc",
+        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
+  }
+
+  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+      String propsFileName, String parquetSourceRoot) throws IOException {
     // Properties used for testing delta-streamer with Parquet source
     TypedProperties parquetProps = new TypedProperties();
     parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.embed.timeline.server","false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
     if (useSchemaProvider) {
-      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
+      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/" + sourceSchemaFile);
       if (hasTransformer) {
-        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/" + targetSchemaFile);
       }
     }
-    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
PARQUET_SOURCE_ROOT);
-
-    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_PARQUET);
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
+    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + 
"/" + propsFileName);
   }
 
   private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
@@ -1001,6 +1039,99 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     testNum++;
   }
 
+  private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName) throws IOException {
+    // Properties used for testing delta-streamer with JsonKafka source
+    TypedProperties props = new TypedProperties();
+    populateCommonProps(props);
+    props.setProperty("include", "base.properties");
+    props.setProperty("hoodie.embed.timeline.server","false");
+    props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", 
JSON_KAFKA_SOURCE_ROOT);
+    props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source_uber.avsc");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target_uber.avsc");
+    props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", 
autoResetValue);
+
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
propsFileName);
+  }
+
+  /**
+   * Tests Deltastreamer with parquet dfs source and transitions to 
JsonKafkaSource.
+   * @param autoResetToLatest true if auto reset value to be set to LATEST. 
false to leave it as default(i.e. EARLIEST)
+   * @throws Exception
+   */
+  private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean 
autoResetToLatest) throws Exception {
+    // prep parquet source
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
+    int parquetRecords = 10;
+    prepareParquetDFSFiles(parquetRecords,"1.parquet", true, 
HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+    prepareParquetDFSSource(true, false,"source_uber.avsc", 
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT);
+    // delta streamer w/ parquest source
+    String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
+            Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
+    deltaStreamer.shutdownGracefully();
+
+    // prep json kafka source
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, 
autoResetToLatest ? "latest" : "earliest", topicName);
+    // delta streamer w/ json kafka source
+    deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+            Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    // if auto reset value is set to LATEST, this all kafka records so far may 
not be synced.
+    int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : 
JSON_KAFKA_NUM_RECORDS);
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
+
+    // verify 2nd batch to test LATEST auto reset value.
+    prepareJsonKafkaDFSFiles(20, false, topicName);
+    totalExpectedRecords += 20;
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
+    testNum++;
+  }
+
+  @Test
+  public void testJsonKafkaDFSSource() throws Exception {
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, 
"earliest",topicName);
+    String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+            Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
+
+    int totalRecords = JSON_KAFKA_NUM_RECORDS;
+    int records = 10;
+    totalRecords += records;
+    prepareJsonKafkaDFSFiles(records, false, topicName);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(totalRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
+  }
+
+  @Test
+  public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws 
Exception {
+    testDeltaStreamerTransitionFromParquetToKafkaSource(false);
+  }
+
+  @Test
+  public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws 
Exception {
+    testDeltaStreamerTransitionFromParquetToKafkaSource(true);
+  }
+
   @Test
   public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() 
throws Exception {
     testParquetDFSSource(false, null);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index a6f4edf..ad1b753 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -119,12 +119,14 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
   @Test //0 corresponds to fg
   public void testMultiTableExecution() throws IOException {
     //create topics for each table
-    testUtils.createTopic("topic1", 2);
-    testUtils.createTopic("topic2", 2);
+    String topicName1 = "topic" + testNum++;
+    String topicName2 = "topic" + testNum;
+    testUtils.createTopic(topicName1, 2);
+    testUtils.createTopic(topicName2, 2);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    testUtils.sendMessages("topic1", 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
-    testUtils.sendMessages("topic2", 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
 
     HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -132,21 +134,23 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
     TypedProperties properties = executionContexts.get(1).getProperties();
     
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_uber.avsc");
     
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_uber.avsc");
+    properties.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName2);
     executionContexts.get(1).setProperties(properties);
     TypedProperties properties1 = executionContexts.get(0).getProperties();
     
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_short_trip_uber.avsc");
     
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_short_trip_uber.avsc");
+    properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName1);
     executionContexts.get(0).setProperties(properties1);
-    String targetBasePath1 = 
executionContexts.get(1).getConfig().targetBasePath;
-    String targetBasePath2 = 
executionContexts.get(0).getConfig().targetBasePath;
+    String targetBasePath1 = 
executionContexts.get(0).getConfig().targetBasePath;
+    String targetBasePath2 = 
executionContexts.get(1).getConfig().targetBasePath;
     streamer.sync();
 
     TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + 
"/*/*.parquet", sqlContext);
     TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 
+ "/*/*.parquet", sqlContext);
 
     //insert updates for already existing records in kafka topics
-    testUtils.sendMessages("topic1", 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
-    testUtils.sendMessages("topic2", 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     streamer.sync();
     assertEquals(2, streamer.getSuccessTables().size());
     assertTrue(streamer.getFailedTables().isEmpty());
@@ -154,5 +158,6 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
     //assert the record count matches now
     TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + 
"/*/*.parquet", sqlContext);
     TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 
+ "/*/*.parquet", sqlContext);
+    testNum++;
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index e8cb2a6..9004c66 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", 
TEST_TOPIC_NAME);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
-    props.setProperty("auto.offset.reset", resetStrategy);
+    props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", 
resetStrategy);
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             String.valueOf(Config.maxEventsFromKafkaSource));
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 0bbdb23..b83fa78 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -46,6 +46,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
 import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
@@ -279,8 +280,12 @@ public class UtilitiesTestBase {
     }
 
     public static void saveParquetToDFS(List<GenericRecord> records, Path 
targetFile) throws IOException {
+      saveParquetToDFS(records, targetFile, 
HoodieTestDataGenerator.AVRO_SCHEMA);
+    }
+
+    public static void saveParquetToDFS(List<GenericRecord> records, Path 
targetFile, Schema schema) throws IOException {
       try (ParquetWriter<GenericRecord> writer = 
AvroParquetWriter.<GenericRecord>builder(targetFile)
-          .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
+          .withSchema(schema)
           .withConf(HoodieTestUtils.getDefaultHadoopConf())
           .withWriteMode(Mode.OVERWRITE)
           .build()) {
@@ -308,9 +313,9 @@ public class UtilitiesTestBase {
       return props;
     }
 
-    public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
+    public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, 
Schema schema) {
       try {
-        Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(schema);
         return (GenericRecord) recordOpt.get();
       } catch (IOException e) {
         return null;
@@ -318,9 +323,13 @@ public class UtilitiesTestBase {
     }
 
     public static List<GenericRecord> toGenericRecords(List<HoodieRecord> 
hoodieRecords) {
+      return toGenericRecords(hoodieRecords, 
HoodieTestDataGenerator.AVRO_SCHEMA);
+    }
+
+    public static List<GenericRecord> toGenericRecords(List<HoodieRecord> 
hoodieRecords, Schema schema) {
       List<GenericRecord> records = new ArrayList<>();
       for (HoodieRecord hoodieRecord : hoodieRecords) {
-        records.add(toGenericRecord(hoodieRecord));
+        records.add(toGenericRecord(hoodieRecord, schema));
       }
       return records;
     }

Reply via email to