This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new af837d2  [HUDI-1447] DeltaStreamer kafka source supports consuming 
from specified timestamp (#2438)
af837d2 is described below

commit af837d2f1825d14ae8403b2290cf5eab39780343
Author: liujinhui <965147...@qq.com>
AuthorDate: Sat Jul 17 12:31:06 2021 +0800

    [HUDI-1447] DeltaStreamer kafka source supports consuming from specified 
timestamp (#2438)
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  15 ++-
 .../hudi/utilities/sources/AvroKafkaSource.java    |   5 +-
 .../hudi/utilities/sources/JsonKafkaSource.java    |   5 +-
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 103 ++++++++++++++++++---
 .../functional/TestHoodieDeltaStreamer.java        |  56 ++++++++---
 .../hudi/utilities/sources/TestKafkaSource.java    |  15 +--
 .../sources/helpers/TestKafkaOffsetGen.java        |  35 +++++--
 7 files changed, 172 insertions(+), 62 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7742e8e..9d445dc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -59,6 +60,7 @@ import 
org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaSet;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.transform.Transformer;
 
 import com.codahale.metrics.Timer;
@@ -318,13 +320,12 @@ public class DeltaSync implements Serializable {
       if (lastCommit.isPresent()) {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
             
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), 
HoodieCommitMetadata.class);
-        if (cfg.checkpoint != null && 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+        if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+                || 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
           resumeCheckpointStr = Option.of(cfg.checkpoint);
-        } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
+        } else if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
           //if previous checkpoint is an empty string, skip resume use 
Option.empty()
-          if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
-            resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
-          }
+          resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
         } else if (commitMetadata.getOperationType() == 
WriteOperationType.CLUSTER) {
           // incase of CLUSTER commit, no checkpoint will be available in 
metadata.
           resumeCheckpointStr = Option.empty();
@@ -336,6 +337,10 @@ public class DeltaSync implements Serializable {
                   + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
                   + commitMetadata.toJsonString());
         }
+        // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
+        if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+          props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
+        }
       }
     } else {
       String partitionColumns = 
HoodieWriterUtils.getPartitionColumns(keyGenerator);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 4cea13d..500c412 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -40,9 +40,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
 import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 
-import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
-import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
-
 /**
  * Reads avro serialized Kafka data, based on the confluent schema-registry.
  */
@@ -104,7 +101,7 @@ public class AvroKafkaSource extends AvroSource {
 
   @Override
   public void onCommit(String lastCkptStr) {
-    if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, 
DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
+    if 
(this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), 
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
       offsetGen.commitOffsetToKafka(lastCkptStr);
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index c1e2e3d..cf9e905 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -35,9 +35,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
 import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 
-import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
-import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
-
 /**
  * Read json kafka data.
  */
@@ -77,7 +74,7 @@ public class JsonKafkaSource extends JsonSource {
 
   @Override
   public void onCommit(String lastCkptStr) {
-    if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, 
DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
+    if 
(this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), 
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
       offsetGen.commitOffsetToKafka(lastCkptStr);
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 4378cb1..a7b983a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDeltaStreamerException;
@@ -30,6 +31,7 @@ import org.apache.hudi.utilities.sources.AvroKafkaSource;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -46,6 +48,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -160,28 +163,48 @@ public class KafkaOffsetGen {
    */
   public static class Config {
 
-    private static final String KAFKA_TOPIC_NAME = 
"hoodie.deltastreamer.source.kafka.topic";
-    private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = 
"hoodie.deltastreamer.kafka.source.maxEvents";
-    public static final String ENABLE_KAFKA_COMMIT_OFFSET = 
"hoodie.deltastreamer.source.kafka.enable.commit.offset";
-    public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false;
+    private static final ConfigProperty<String> KAFKA_TOPIC_NAME = 
ConfigProperty
+            .key("hoodie.deltastreamer.source.kafka.topic")
+            .noDefaultValue()
+            .withDocumentation("Kafka topic name.");
+
+    public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = 
ConfigProperty
+            .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
+            .defaultValue("string")
+            .withDocumentation("Kafka chepoint type.");
+
+    public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = 
ConfigProperty
+            .key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
+            .defaultValue(false)
+            .withDocumentation("Automatically submits offset to kafka.");
+
+    public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP 
= ConfigProperty
+            .key("hoodie.deltastreamer.kafka.source.maxEvents")
+            .defaultValue(5000000L)
+            .withDocumentation("Maximum number of records obtained in each 
batch.");
+
     // "auto.offset.reset" is kafka native config param. Do not change the 
config param name.
-    public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset";
-    private static final KafkaResetOffsetStrategies 
DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST;
-    public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
-    public static long maxEventsFromKafkaSource = 
DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
+    private static final ConfigProperty<KafkaResetOffsetStrategies> 
KAFKA_AUTO_OFFSET_RESET = ConfigProperty
+            .key("auto.offset.reset")
+            .defaultValue(KafkaResetOffsetStrategies.LATEST)
+            .withDocumentation("Kafka consumer strategy for reading data.");
+
+    public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
   }
 
   private final Map<String, Object> kafkaParams;
   private final TypedProperties props;
   protected final String topicName;
   private KafkaResetOffsetStrategies autoResetValue;
+  private final String kafkaCheckpointType;
 
   public KafkaOffsetGen(TypedProperties props) {
     this.props = props;
     kafkaParams = excludeHoodieConfigs(props);
-    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(Config.KAFKA_TOPIC_NAME));
-    topicName = props.getString(Config.KAFKA_TOPIC_NAME);
-    String kafkaAutoResetOffsetsStr = 
props.getString(Config.KAFKA_AUTO_OFFSET_RESET, 
Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(Config.KAFKA_TOPIC_NAME.key()));
+    topicName = props.getString(Config.KAFKA_TOPIC_NAME.key());
+    kafkaCheckpointType = props.getString(Config.KAFKA_CHECKPOINT_TYPE.key(), 
Config.KAFKA_CHECKPOINT_TYPE.defaultValue());
+    String kafkaAutoResetOffsetsStr = 
props.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(), 
Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
     boolean found = false;
     for (KafkaResetOffsetStrategies entry: 
KafkaResetOffsetStrategies.values()) {
       if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
@@ -194,7 +217,7 @@ public class KafkaOffsetGen {
       throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + 
" config set to unknown value " + kafkaAutoResetOffsetsStr);
     }
     if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
-      this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, 
Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
+      this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(), 
Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
     }
   }
 
@@ -212,6 +235,9 @@ public class KafkaOffsetGen {
       Set<TopicPartition> topicPartitions = partitionInfoList.stream()
               .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
+      if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) 
&& isValidTimestampCheckpointType(lastCheckpointStr)) {
+        lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+      }
       // Determine the offset ranges to read from
       if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() 
&& checkTopicCheckpoint(lastCheckpointStr)) {
         fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, 
topicPartitions);
@@ -237,8 +263,8 @@ public class KafkaOffsetGen {
     }
 
     // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
-    long maxEventsToReadFromKafka = 
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
-        Config.maxEventsFromKafkaSource);
+    long maxEventsToReadFromKafka = 
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(),
+            Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue());
 
     long numEvents;
     if (sourceLimit == Long.MAX_VALUE) {
@@ -271,6 +297,20 @@ public class KafkaOffsetGen {
     return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
   }
 
+  /**
+   * Check if the checkpoint is a timestamp.
+   * @param lastCheckpointStr
+   * @return
+   */
+  private Boolean isValidTimestampCheckpointType(Option<String> 
lastCheckpointStr) {
+    if (!lastCheckpointStr.isPresent()) {
+      return false;
+    }
+    Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
+    Matcher isNum = pattern.matcher(lastCheckpointStr.get());
+    return isNum.matches() && (lastCheckpointStr.get().length() == 13 || 
lastCheckpointStr.get().length() == 10);
+  }
+
   private Long delayOffsetCalculation(Option<String> lastCheckpointStr, 
Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
     Long delayCount = 0L;
     Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
@@ -284,6 +324,41 @@ public class KafkaOffsetGen {
   }
 
   /**
+   * Get the checkpoint by timestamp.
+   * This method returns the checkpoint format based on the timestamp.
+   * example:
+   * 1. input: timestamp, etc.
+   * 2. output: 
topicName,partition_num_0:100,partition_num_1:101,partition_num_2:102.
+   *
+   * @param consumer
+   * @param topicName
+   * @param timestamp
+   * @return
+   */
+  private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, 
List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions,
+                                               String topicName, Long 
timestamp) {
+
+    Map<TopicPartition, Long> topicPartitionsTimestamp = 
partitionInfoList.stream()
+                                                    .map(x -> new 
TopicPartition(x.topic(), x.partition()))
+                                                    
.collect(Collectors.toMap(Function.identity(), x -> timestamp));
+
+    Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
+    Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = 
consumer.offsetsForTimes(topicPartitionsTimestamp);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(topicName + ",");
+    for (Map.Entry<TopicPartition, OffsetAndTimestamp> map : 
offsetAndTimestamp.entrySet()) {
+      if (map.getValue() != null) {
+        
sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
+      } else {
+        
sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
+      }
+    }
+    return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
+  }
+
+
+  /**
    * Check if topic exists.
    * @param consumer kafka consumer
    * @return
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 642c666..db0ab19 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -58,7 +58,6 @@ import org.apache.hudi.utilities.sources.JdbcSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 import org.apache.hudi.utilities.testutils.JdbcTestUtils;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
@@ -139,6 +138,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private static final int PARQUET_NUM_RECORDS = 5;
   private static final int CSV_NUM_RECORDS = 3;
   private static final int JSON_KAFKA_NUM_RECORDS = 5;
+  private String kafkaCheckpointType = "string";
   // Required fields
   private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
   private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
@@ -274,7 +274,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   protected static void populateCommonKafkaProps(TypedProperties props) {
     //Kafka source properties
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
-    props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest");
+    props.setProperty("auto.offset.reset", "earliest");
     props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
@@ -360,12 +360,13 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
                                                  String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
                                                  String payloadClassName, 
String tableType) {
       return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassNames, propsFilename, enableHiveSync,
-          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType, "timestamp");
+          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType, "timestamp", null);
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
                                                  List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
-                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField) {
+                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
+                                                 String checkpoint) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -377,6 +378,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       cfg.sourceOrderingField = sourceOrderingField;
       cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
       cfg.sourceLimit = sourceLimit;
+      cfg.checkpoint = checkpoint;
       if (updatePayloadClass) {
         cfg.payloadClassName = payloadClassName;
       }
@@ -601,7 +603,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
             Arguments.of(allConfig, conf)
     );
   }
-  
+
   @ParameterizedTest
   @MethodSource("provideValidCliArgs")
   public void testValidCommandLineArgs(String[] args, 
HoodieDeltaStreamer.Config expected) {
@@ -1399,7 +1401,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
-            useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
+            useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
     testNum++;
@@ -1414,10 +1416,11 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
     props.setProperty("hoodie.deltastreamer.source.dfs.root", 
JSON_KAFKA_SOURCE_ROOT);
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
+    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
+    props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
     
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source_uber.avsc");
     
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target_uber.avsc");
-    props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue);
+    props.setProperty("auto.offset.reset", autoResetValue);
 
     UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
propsFileName);
   }
@@ -1440,7 +1443,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
-            false, 100000, false, null, null, "timestamp"), jsc);
+            false, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(parquetRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
     deltaStreamer.shutdownGracefully();
@@ -1453,7 +1456,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
-            true, 100000, false, null, null, "timestamp"), jsc);
+            true, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
     // if auto reset value is set to LATEST, this all kafka records so far may 
not be synced.
     int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : 
JSON_KAFKA_NUM_RECORDS);
@@ -1471,12 +1474,12 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public void testJsonKafkaDFSSource() throws Exception {
     topicName = "topic" + testNum;
     prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
-    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, 
"earliest",topicName);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName);
     String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
-            true, 100000, false, null, null, "timestamp"), jsc);
+            true, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
 
@@ -1489,6 +1492,31 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   }
 
   @Test
+  public void testKafkaTimestampType() throws Exception {
+    topicName = "topic" + testNum;
+    kafkaCheckpointType = "timestamp";
+    prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName);
+    String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+            TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+                    Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, 
false,
+                    true, 100000, false, null,
+                    null, "timestamp", 
String.valueOf(System.currentTimeMillis())), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
+
+    prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
+    deltaStreamer = new HoodieDeltaStreamer(
+            TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+                    Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, 
false,
+                    true, 100000, false, null, null,
+                    "timestamp", String.valueOf(System.currentTimeMillis())), 
jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + 
"/*/*.parquet", sqlContext);
+  }
+
+  @Test
   public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws 
Exception {
     testDeltaStreamerTransitionFromParquetToKafkaSource(false);
   }
@@ -1566,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
         new HoodieDeltaStreamer(TestHelpers.makeConfig(
             tableBasePath, WriteOperationType.INSERT, 
CsvDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
-            useSchemaProvider, 1000, false, null, null, sourceOrderingField), 
jsc);
+            useSchemaProvider, 1000, false, null, null, sourceOrderingField, 
null), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
     testNum++;
@@ -1679,7 +1707,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       String tableBasePath = dfsBasePath + "/triprec";
       HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, JdbcSource.class.getName(),
           null, "test-jdbc-source.properties", false,
-          false, sourceLimit, false, null, null, "timestamp");
+          false, sourceLimit, false, null, null, "timestamp", null);
       cfg.continuousMode = true;
       // Add 1000 records
       JdbcTestUtils.clearAndInsert("000", numRecords, connection, new 
HoodieTestDataGenerator(), props);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index a1a00fa..aa25446 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -94,11 +94,11 @@ public class TestKafkaSource extends UtilitiesTestBase {
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", 
TEST_TOPIC_NAME);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
-    props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy);
+    props.setProperty("auto.offset.reset", resetStrategy);
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
-            String.valueOf(Config.maxEventsFromKafkaSource));
+            
String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
     return props;
   }
@@ -193,7 +193,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
 
     Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-    Config.maxEventsFromKafkaSource = 500;
 
     /*
     1. Extract without any checkpoint => get all the data, respecting default 
upper cap since both sourceLimit and
@@ -208,9 +207,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
     InputBatch<Dataset<Row>> fetch2 =
         
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 1500);
     assertEquals(1000, fetch2.getBatch().get().count());
-
-    //reset the value back since it is a static variable
-    Config.maxEventsFromKafkaSource = 
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
   }
 
   @Test
@@ -222,7 +218,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
 
     Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-    Config.maxEventsFromKafkaSource = 500;
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
 
     /*
      1. maxEventsFromKafkaSourceProp set to more than generated insert records
@@ -240,9 +236,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
     InputBatch<Dataset<Row>> fetch2 =
             
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 300);
     assertEquals(300, fetch2.getBatch().get().count());
-
-    //reset the value back since it is a static variable
-    Config.maxEventsFromKafkaSource = 
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
   }
 
   @Test
@@ -300,7 +293,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(null, "earliest");
-    props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true");
+    props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
     Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index ccc141b..eff9b24 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -23,8 +23,8 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -62,9 +62,10 @@ public class TestKafkaOffsetGen {
     testUtils.teardown();
   }
 
-  private TypedProperties getConsumerConfigs(String autoOffsetReset) {
+  private TypedProperties getConsumerConfigs(String autoOffsetReset, String 
kafkaCheckpointType) {
     TypedProperties props = new TypedProperties();
-    props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
+    props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
+    props.put("auto.offset.reset", autoOffsetReset);
     props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("key.deserializer", StringDeserializer.class.getName());
@@ -79,7 +80,7 @@ public class TestKafkaOffsetGen {
     testUtils.createTopic(TEST_TOPIC_NAME, 1);
     testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
 
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
     assertEquals(1, nextOffsetRanges.length);
     assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -96,7 +97,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(TEST_TOPIC_NAME, 1);
     testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
     assertEquals(1, nextOffsetRanges.length);
     assertEquals(1000, nextOffsetRanges[0].fromOffset());
@@ -109,7 +110,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(TEST_TOPIC_NAME, 1);
     testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
 
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, 
metrics);
     assertEquals(1, nextOffsetRanges.length);
@@ -118,11 +119,25 @@ public class TestKafkaOffsetGen {
   }
 
   @Test
+  public void testGetNextOffsetRangesFromTimestampCheckpointType() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(TEST_TOPIC_NAME, 1);
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
+
+    OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis()
 - 100000)), 500, metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(500, nextOffsetRanges[0].untilOffset());
+  }
+
+  @Test
   public void testGetNextOffsetRangesFromMultiplePartitions() {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(TEST_TOPIC_NAME, 2);
     testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
     assertEquals(2, nextOffsetRanges.length);
     assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -136,7 +151,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(TEST_TOPIC_NAME, 2);
     testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("group"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("group", "string"));
     String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
     kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
     // don't pass lastCheckpointString as we want to read from group committed 
offset
@@ -147,7 +162,7 @@ public class TestKafkaOffsetGen {
     assertEquals(399, nextOffsetRanges[1].untilOffset());
 
     // committed offsets are not present for the consumer group
-    kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
+    kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
     nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
     assertEquals(500, nextOffsetRanges[0].fromOffset());
     assertEquals(500, nextOffsetRanges[0].untilOffset());
@@ -157,7 +172,7 @@ public class TestKafkaOffsetGen {
 
   @Test
   public void testCheckTopicExists() {
-    TypedProperties props = getConsumerConfigs("latest");
+    TypedProperties props = getConsumerConfigs("latest", "string");
     KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
     testUtils.createTopic(TEST_TOPIC_NAME, 1);
     boolean topicExists = kafkaOffsetGen.checkTopicExists(new 
KafkaConsumer(props));

Reply via email to