garyli1019 commented on a change in pull request #1719:
URL: https://github.com/apache/hudi/pull/1719#discussion_r437841744



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, 
JavaSparkContext sparkContext, Spa
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
     OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
-      return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? 
lastCheckpointStr.get() : "");

Review comment:
       hmmm interesting... so right now if we use `LATEST` as reset key, then 
we will fall into a dead loop unless we are lucky enough to have message fall 
in between two `consumer.endOffsets(topicPartitions)` call.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, 
JavaSparkContext sparkContext, Spa
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
     OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
-      return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? 
lastCheckpointStr.get() : "");
-    } else {
-      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
+      return new InputBatch<>(Option.empty(),
+              lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : 
CheckpointUtils.offsetsToStr(offsetRanges));

Review comment:
       could `lastCheckpointStr` be `""` here? 
   Also, can we add a test for this case?
   
https://github.com/apache/hudi/blob/master/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java#L107




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to