garyli1019 commented on a change in pull request #1719: URL: https://github.com/apache/hudi/pull/1719#discussion_r437841744
########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ########## @@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); Review comment: hmmm interesting... so right now if we use `LATEST` as reset key, then we will fall into a dead loop unless we are lucky enough to have message fall in between two `consumer.endOffsets(topicPartitions)` call. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ########## @@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); - } else { - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + return new InputBatch<>(Option.empty(), + lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : CheckpointUtils.offsetsToStr(offsetRanges)); Review comment: could `lastCheckpointStr` be `""` here? Also, can we add a test for this case? https://github.com/apache/hudi/blob/master/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java#L107 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org