sv2000 commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660196262



##########
File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Each call to consume() returns an iterator over a new batch of records. 
Let's say the returned records are R1, R2, R3, and further, only R2 is 
null-valued. If we use a single while loop, we would end up calling consume() 
the moment we encounter R2, resulting in R3 being skipped. With the current 
implementation, R2 will be skipped because it is null and the next iteration of 
the while loop will correctly return R3, causing the outer while loop to be 
exited.
   
   In theory, we can make it work with a single while loop and if conditions 
inside the while loop to handle the null-valued records as a special case. I am 
not sure if it would add more clarity than what the current implementation does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to