[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r671607330 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) { Set topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidCheckpointType(lastCheckpointStr)) { +lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get())); + } Review comment: ok, I get it now. makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r671353717 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) { Set topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidCheckpointType(lastCheckpointStr)) { +lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get())); + } Review comment: not sure I understand. this is what I am thinking ``` if (timestamp based checkpoint) lastCheckpoint = getOffsetByTimestamp() else if regular checkpoint type lastCheckpoint = fetValidOffsets() else reset based on auto.offset.reset. ``` Am I misunderstanding anything here? Can you help me understand please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r671353717 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) { Set topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidCheckpointType(lastCheckpointStr)) { +lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get())); + } Review comment: not sure I understand. this is what I am thinking ``` if (timestamp based checkpoint) lastCheckpoint = getOffsetByTimestamp() else if regular checkpoint type lastCheckpoint = fetValidOffsets() else reset based on auto.offset.reset. ``` Am I misunderstanding anything here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r670978737 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) { Set topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidCheckpointType(lastCheckpointStr)) { +lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get())); + } Review comment: I was expecting a else if block after this line. Can you clarify please. If not, we might go into the else block ? ## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java ## @@ -64,7 +63,7 @@ public void teardown() throws Exception { private TypedProperties getConsumerConfigs(String autoOffsetReset) { TypedProperties props = new TypedProperties(); -props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset); +props.put("auto.offset.reset", autoOffsetReset); Review comment: Do you think we can add some tests to this class for the timestamp type? ## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java ## @@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() { Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); -Config.maxEventsFromKafkaSource = 500; +//props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); Review comment: why commented out? ## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java ## @@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() { Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); -Config.maxEventsFromKafkaSource = 500; +//props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); Review comment: I tried your patch locally. the test fails if I uncomment this line. I don't understand ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r663377708 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -282,6 +301,36 @@ private Long delayOffsetCalculation(Option lastCheckpointStr, Set lastCheckpointStr) { Review comment: should we name this "isValidCheckpointType" or something? also, can you add java docs as to what validation we are doing here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r657945070 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -312,13 +313,13 @@ public void refreshTimeline() throws IOException { if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); -if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { - resumeCheckpointStr = Option.of(cfg.checkpoint); -} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { - //if previous checkpoint is an empty string, skip resume use Option.empty() - if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) { -resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); +if (cfg.checkpoint != null) { Review comment: we could club both these within single if condition. ``` if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { resumeCheckpointStr = Option.of(cfg.checkpoint); } ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException { + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + commitMetadata.toJsonString()); } +if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", "string"); Review comment: actually better thing to do here is to remove the entry from props. wdyt? ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException { + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + commitMetadata.toJsonString()); } +if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", "string"); Review comment: rather than hardcoding the config here, can we use variable please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r657546921 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -461,7 +465,7 @@ public void refreshTimeline() throws IOException { if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); - if (cfg.checkpoint != null) { + if (cfg.checkpoint != null && !"timestamp".equals(props.getString("hoodie.deltastreamer.source.kafka.checkpoint.type"))) { Review comment: Can you help me understand why we need this ? My understanding is that, user will set cfg.checkpoint during first batch and set the checkpoint type (to timestamp) as well. but even for any checkpoint types, we should add the checkpoint_reset_key here at the end of 1st batch. Am I missing something. can you please help me understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r641925742 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) { @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") public String checkpoint = null; +/** + * 1. string: topicName,partition number 0:offset value,partition number 1:offset value Review comment: this format is specific to kafka. lets call it out. other sources could have checkpoint differently. ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) { @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") public String checkpoint = null; +/** + * 1. string: topicName,partition number 0:offset value,partition number 1:offset value + * 2. timestamp: kafka offset timestamp + * example + * 1. hudi_topic,0:100,1:101,2:201 + * 2. 1621947081 + */ +@Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, divided into timestamp or string offset") +public String checkpointType = "string"; Review comment: I am contemplating between "string" or "default" or "regular" to be set as default checkpoint type. @n3nash : any thoughts. We are looking to introduce a new config called checkpoint type. by default we need to set some value. this patch adds a new checkpoint type "timestamp" for kafka source. ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; Review comment: can we revert unintended changes in this file. ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) { @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") public String checkpoint = null; +/** + * 1. string: topicName,partition number 0:offset value,partition number 1:offset value + * 2. timestamp: kafka offset timestamp + * example + * 1. hudi_topic,0:100,1:101,2:201 + * 2. 1621947081 + */ +@Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, divided into timestamp or string offset") +public String checkpointType = "string"; Review comment: sorry, why do we have this config in two places. We have it defined as top level config in HoodieDeltaStreamer.Config. But in KafkaOffsetGen, I see you are accessing it as "hoodie.deltastreamer.source.kafka.checkpoint.type". May be we should rely on this config param and remove it from top level since this is applicable just to kafka for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r604841403 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -247,6 +266,32 @@ private Long delayOffsetCalculation(Option lastCheckpointStr, Set partitionInfoList, String topicName, Long timestamp) { Review comment: Can we add tests for the new code that is added. I don't see any tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r604840433 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -553,6 +555,11 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); this.props = properties.get(); + String kafkaCheckpointTimestamp = props.getString(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TIMESTAMP, ""); Review comment: Let me think more on this. Wondering if we should just rely on existing "HoodieDeltaStreamer.Config.checkpoint" only and add another config named "checkpoint.type" or something which could be set to timestamp for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r604828288 ## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java ## @@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception { return upsert(WriteOperationType.UPSERT); } - public Pair>> fetchSource() throws Exception { + public Pair>, Pair> fetchSource() throws Exception { Review comment: actually my PR was closed as it was invalid. But [here](https://github.com/nsivabalan/hudi/blob/f7439e2e28748bf7b713fb72ba611f8af7bb97a1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ReadBatch.java) is the class that I added. May be you can add it in this patch only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r594471195 ## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java ## @@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception { return upsert(WriteOperationType.UPSERT); } - public Pair>> fetchSource() throws Exception { + public Pair>, Pair> fetchSource() throws Exception { Review comment: this is getting out of hand(two pairs within a pair). we can't keep adding more Pairs here. I am adding a class to hold the return value in a class here in one of my PRs. Lets see if we can rebase once the other PR lands. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp
nsivabalan commented on a change in pull request #2438: URL: https://github.com/apache/hudi/pull/2438#discussion_r594471195 ## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java ## @@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception { return upsert(WriteOperationType.UPSERT); } - public Pair>> fetchSource() throws Exception { + public Pair>, Pair> fetchSource() throws Exception { Review comment: this is getting out of hand(two pairs within a pair). we can't keep adding more Pairs here. I am adding a class to hold the return value here in one of my PRs. Lets see if we can rebase once the other PR lands. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org