This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new af837d2 [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp (#2438) af837d2 is described below commit af837d2f1825d14ae8403b2290cf5eab39780343 Author: liujinhui <965147...@qq.com> AuthorDate: Sat Jul 17 12:31:06 2021 +0800 [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp (#2438) --- .../hudi/utilities/deltastreamer/DeltaSync.java | 15 ++- .../hudi/utilities/sources/AvroKafkaSource.java | 5 +- .../hudi/utilities/sources/JsonKafkaSource.java | 5 +- .../utilities/sources/helpers/KafkaOffsetGen.java | 103 ++++++++++++++++++--- .../functional/TestHoodieDeltaStreamer.java | 56 ++++++++--- .../hudi/utilities/sources/TestKafkaSource.java | 15 +-- .../sources/helpers/TestKafkaOffsetGen.java | 35 +++++-- 7 files changed, 172 insertions(+), 62 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7742e8e..9d445dc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; @@ -59,6 +60,7 @@ import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaSet; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.transform.Transformer; import com.codahale.metrics.Timer; @@ -318,13 +320,12 @@ public class DeltaSync implements Serializable { if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); - if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) + || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { resumeCheckpointStr = Option.of(cfg.checkpoint); - } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { + } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) { //if previous checkpoint is an empty string, skip resume use Option.empty() - if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) { - resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); - } + resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); } else if (commitMetadata.getOperationType() == WriteOperationType.CLUSTER) { // incase of CLUSTER commit, no checkpoint will be available in metadata. resumeCheckpointStr = Option.empty(); @@ -336,6 +337,10 @@ public class DeltaSync implements Serializable { + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + commitMetadata.toJsonString()); } + // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. + if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key()); + } } } else { String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 4cea13d..500c412 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -40,9 +40,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; -import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; -import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; - /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ @@ -104,7 +101,7 @@ public class AvroKafkaSource extends AvroSource { @Override public void onCommit(String lastCkptStr) { - if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { offsetGen.commitOffsetToKafka(lastCkptStr); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index c1e2e3d..cf9e905 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -35,9 +35,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; -import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; -import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; - /** * Read json kafka data. */ @@ -77,7 +74,7 @@ public class JsonKafkaSource extends JsonSource { @Override public void onCommit(String lastCkptStr) { - if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { offsetGen.commitOffsetToKafka(lastCkptStr); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 4378cb1..a7b983a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieDeltaStreamerException; @@ -30,6 +31,7 @@ import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -46,6 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -160,28 +163,48 @@ public class KafkaOffsetGen { */ public static class Config { - private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; - private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - public static final String ENABLE_KAFKA_COMMIT_OFFSET = "hoodie.deltastreamer.source.kafka.enable.commit.offset"; - public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false; + private static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty + .key("hoodie.deltastreamer.source.kafka.topic") + .noDefaultValue() + .withDocumentation("Kafka topic name."); + + public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty + .key("hoodie.deltastreamer.source.kafka.checkpoint.type") + .defaultValue("string") + .withDocumentation("Kafka chepoint type."); + + public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty + .key("hoodie.deltastreamer.source.kafka.enable.commit.offset") + .defaultValue(false) + .withDocumentation("Automatically submits offset to kafka."); + + public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty + .key("hoodie.deltastreamer.kafka.source.maxEvents") + .defaultValue(5000000L) + .withDocumentation("Maximum number of records obtained in each batch."); + // "auto.offset.reset" is kafka native config param. Do not change the config param name. - public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset"; - private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST; - public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; - public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; + private static final ConfigProperty<KafkaResetOffsetStrategies> KAFKA_AUTO_OFFSET_RESET = ConfigProperty + .key("auto.offset.reset") + .defaultValue(KafkaResetOffsetStrategies.LATEST) + .withDocumentation("Kafka consumer strategy for reading data."); + + public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp"; } private final Map<String, Object> kafkaParams; private final TypedProperties props; protected final String topicName; private KafkaResetOffsetStrategies autoResetValue; + private final String kafkaCheckpointType; public KafkaOffsetGen(TypedProperties props) { this.props = props; kafkaParams = excludeHoodieConfigs(props); - DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); - topicName = props.getString(Config.KAFKA_TOPIC_NAME); - String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME.key())); + topicName = props.getString(Config.KAFKA_TOPIC_NAME.key()); + kafkaCheckpointType = props.getString(Config.KAFKA_CHECKPOINT_TYPE.key(), Config.KAFKA_CHECKPOINT_TYPE.defaultValue()); + String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(), Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase()); boolean found = false; for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) { if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) { @@ -194,7 +217,7 @@ public class KafkaOffsetGen { throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr); } if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) { - this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); + this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(), Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase()); } } @@ -212,6 +235,9 @@ public class KafkaOffsetGen { Set<TopicPartition> topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidTimestampCheckpointType(lastCheckpointStr)) { + lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get())); + } // Determine the offset ranges to read from if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) { fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions); @@ -237,8 +263,8 @@ public class KafkaOffsetGen { } // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) - long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, - Config.maxEventsFromKafkaSource); + long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(), + Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()); long numEvents; if (sourceLimit == Long.MAX_VALUE) { @@ -271,6 +297,20 @@ public class KafkaOffsetGen { return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } + /** + * Check if the checkpoint is a timestamp. + * @param lastCheckpointStr + * @return + */ + private Boolean isValidTimestampCheckpointType(Option<String> lastCheckpointStr) { + if (!lastCheckpointStr.isPresent()) { + return false; + } + Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?"); + Matcher isNum = pattern.matcher(lastCheckpointStr.get()); + return isNum.matches() && (lastCheckpointStr.get().length() == 13 || lastCheckpointStr.get().length() == 10); + } + private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) { Long delayCount = 0L; Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); @@ -284,6 +324,41 @@ public class KafkaOffsetGen { } /** + * Get the checkpoint by timestamp. + * This method returns the checkpoint format based on the timestamp. + * example: + * 1. input: timestamp, etc. + * 2. output: topicName,partition_num_0:100,partition_num_1:101,partition_num_2:102. + * + * @param consumer + * @param topicName + * @param timestamp + * @return + */ + private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions, + String topicName, Long timestamp) { + + Map<TopicPartition, Long> topicPartitionsTimestamp = partitionInfoList.stream() + .map(x -> new TopicPartition(x.topic(), x.partition())) + .collect(Collectors.toMap(Function.identity(), x -> timestamp)); + + Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp); + + StringBuilder sb = new StringBuilder(); + sb.append(topicName + ","); + for (Map.Entry<TopicPartition, OffsetAndTimestamp> map : offsetAndTimestamp.entrySet()) { + if (map.getValue() != null) { + sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(","); + } else { + sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(","); + } + } + return Option.of(sb.deleteCharAt(sb.length() - 1).toString()); + } + + + /** * Check if topic exists. * @param consumer kafka consumer * @return diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 642c666..db0ab19 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -58,7 +58,6 @@ import org.apache.hudi.utilities.sources.JdbcSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; -import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.testutils.JdbcTestUtils; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource; @@ -139,6 +138,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final int PARQUET_NUM_RECORDS = 5; private static final int CSV_NUM_RECORDS = 3; private static final int JSON_KAFKA_NUM_RECORDS = 5; + private String kafkaCheckpointType = "string"; // Required fields private static final String TGT_BASE_PATH_PARAM = "--target-base-path"; private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; @@ -274,7 +274,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { protected static void populateCommonKafkaProps(TypedProperties props) { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest"); + props.setProperty("auto.offset.reset", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); @@ -360,12 +360,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) { return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, - useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp"); + useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null); } static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, - int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, + String checkpoint) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -377,6 +378,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.sourceOrderingField = sourceOrderingField; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = sourceLimit; + cfg.checkpoint = checkpoint; if (updatePayloadClass) { cfg.payloadClassName = payloadClassName; } @@ -601,7 +603,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { Arguments.of(allConfig, conf) ); } - + @ParameterizedTest @MethodSource("provideValidCliArgs") public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) { @@ -1399,7 +1401,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, - useSchemaProvider, 100000, false, null, null, "timestamp"), jsc); + useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); testNum++; @@ -1414,10 +1416,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); - props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); + props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); + props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); - props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue); + props.setProperty("auto.offset.reset", autoResetValue); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); } @@ -1440,7 +1443,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false, - false, 100000, false, null, null, "timestamp"), jsc); + false, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext); deltaStreamer.shutdownGracefully(); @@ -1453,7 +1456,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, - true, 100000, false, null, null, "timestamp"), jsc); + true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); // if auto reset value is set to LATEST, this all kafka records so far may not be synced. int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS); @@ -1471,12 +1474,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public void testJsonKafkaDFSSource() throws Exception { topicName = "topic" + testNum; prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); - prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName); String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, - true, 100000, false, null, null, "timestamp"), jsc); + true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); @@ -1489,6 +1492,31 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } @Test + public void testKafkaTimestampType() throws Exception { + topicName = "topic" + testNum; + kafkaCheckpointType = "timestamp"; + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName); + String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, + null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName); + deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, + "timestamp", String.valueOf(System.currentTimeMillis())), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext); + } + + @Test public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception { testDeltaStreamerTransitionFromParquetToKafkaSource(false); } @@ -1566,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { new HoodieDeltaStreamer(TestHelpers.makeConfig( tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_CSV, false, - useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc); + useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); testNum++; @@ -1679,7 +1707,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/triprec"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(), null, "test-jdbc-source.properties", false, - false, sourceLimit, false, null, null, "timestamp"); + false, sourceLimit, false, null, null, "timestamp", null); cfg.continuousMode = true; // Add 1000 records JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index a1a00fa..aa25446 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -94,11 +94,11 @@ public class TestKafkaSource extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy); + props.setProperty("auto.offset.reset", resetStrategy); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : - String.valueOf(Config.maxEventsFromKafkaSource)); + String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); return props; } @@ -193,7 +193,6 @@ public class TestKafkaSource extends UtilitiesTestBase { Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); - Config.maxEventsFromKafkaSource = 500; /* 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and @@ -208,9 +207,6 @@ public class TestKafkaSource extends UtilitiesTestBase { InputBatch<Dataset<Row>> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500); assertEquals(1000, fetch2.getBatch().get().count()); - - //reset the value back since it is a static variable - Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @Test @@ -222,7 +218,7 @@ public class TestKafkaSource extends UtilitiesTestBase { Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); - Config.maxEventsFromKafkaSource = 500; + props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); /* 1. maxEventsFromKafkaSourceProp set to more than generated insert records @@ -240,9 +236,6 @@ public class TestKafkaSource extends UtilitiesTestBase { InputBatch<Dataset<Row>> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300); assertEquals(300, fetch2.getBatch().get().count()); - - //reset the value back since it is a static variable - Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @Test @@ -300,7 +293,7 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties props = createPropsForJsonSource(null, "earliest"); - props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true"); + props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true"); Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index ccc141b..eff9b24 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; -import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -62,9 +62,10 @@ public class TestKafkaOffsetGen { testUtils.teardown(); } - private TypedProperties getConsumerConfigs(String autoOffsetReset) { + private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) { TypedProperties props = new TypedProperties(); - props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset); + props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); + props.put("auto.offset.reset", autoOffsetReset); props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("key.deserializer", StringDeserializer.class.getName()); @@ -79,7 +80,7 @@ public class TestKafkaOffsetGen { testUtils.createTopic(TEST_TOPIC_NAME, 1); testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); - KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest")); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics); assertEquals(1, nextOffsetRanges.length); assertEquals(0, nextOffsetRanges[0].fromOffset()); @@ -96,7 +97,7 @@ public class TestKafkaOffsetGen { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.createTopic(TEST_TOPIC_NAME, 1); testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); - KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest")); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics); assertEquals(1, nextOffsetRanges.length); assertEquals(1000, nextOffsetRanges[0].fromOffset()); @@ -109,7 +110,7 @@ public class TestKafkaOffsetGen { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.createTopic(TEST_TOPIC_NAME, 1); testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); - KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest")); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics); assertEquals(1, nextOffsetRanges.length); @@ -118,11 +119,25 @@ public class TestKafkaOffsetGen { } @Test + public void testGetNextOffsetRangesFromTimestampCheckpointType() { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.createTopic(TEST_TOPIC_NAME, 1); + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "timestamp")); + + OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis() - 100000)), 500, metrics); + assertEquals(1, nextOffsetRanges.length); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(500, nextOffsetRanges[0].untilOffset()); + } + + @Test public void testGetNextOffsetRangesFromMultiplePartitions() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.createTopic(TEST_TOPIC_NAME, 2); testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); - KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest")); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics); assertEquals(2, nextOffsetRanges.length); assertEquals(0, nextOffsetRanges[0].fromOffset()); @@ -136,7 +151,7 @@ public class TestKafkaOffsetGen { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.createTopic(TEST_TOPIC_NAME, 2); testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); - KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group")); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string")); String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249"; kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); // don't pass lastCheckpointString as we want to read from group committed offset @@ -147,7 +162,7 @@ public class TestKafkaOffsetGen { assertEquals(399, nextOffsetRanges[1].untilOffset()); // committed offsets are not present for the consumer group - kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group")); + kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string")); nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); assertEquals(500, nextOffsetRanges[0].fromOffset()); assertEquals(500, nextOffsetRanges[0].untilOffset()); @@ -157,7 +172,7 @@ public class TestKafkaOffsetGen { @Test public void testCheckTopicExists() { - TypedProperties props = getConsumerConfigs("latest"); + TypedProperties props = getConsumerConfigs("latest", "string"); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props); testUtils.createTopic(TEST_TOPIC_NAME, 1); boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));