scwhittle commented on code in PR #32456:
URL: https://github.com/apache/beam/pull/32456#discussion_r1768193743


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -511,6 +532,9 @@ public ProcessContinuation processElement(
             }
           }
         }
+        LOG.warn(
+            "{} records were skipped because of seek returning an earlier 
position.",

Review Comment:
   how about
   {} records were skipped due to seek returning to earlier position than 
requested position of {}
   



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,28 @@ public ProcessContinuation processElement(
           }
           return ProcessContinuation.resume();
         }
+        long skippedRecords = 0L;
+        final Stopwatch sw = Stopwatch.createStarted();
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped. This is needed because there is 
a possibility
+          // that the seek() above fails to move the offset to the desired 
position. In which
+          // case poll() would return records that are already cnsumed.
+          if (rawRecord.offset() < startOffset) {
+            // If the start offset is not reached even after skipping the 
records for 10 seconds
+            // then the processing is stopped with a backoff to give the Kakfa 
server some time
+            // catch up.
+            if (sw.elapsed().getSeconds() > 10L) {
+              LOG.error(
+                  "The expected offset was not reached even after skipping 
consumed records for 10 seconds,"
+                      + " the processing of this bundle will be attempted at a 
later time.");
+              return ProcessContinuation.resume()
+                  
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
+            } else {
+              skippedRecords++;
+              continue;
+            }
+          }
           if (!tracker.tryClaim(rawRecord.offset())) {

Review Comment:
   I think you should have 
   if (skippedRecords > 0) {
     LOG.warn(....) // about skipping
   }
   here instead.
   
   Below you are going to log it separately for every batch of rawRecords and 
you also will log it for skipping 0 records.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,28 @@ public ProcessContinuation processElement(
           }
           return ProcessContinuation.resume();
         }
+        long skippedRecords = 0L;
+        final Stopwatch sw = Stopwatch.createStarted();
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped. This is needed because there is 
a possibility
+          // that the seek() above fails to move the offset to the desired 
position. In which
+          // case poll() would return records that are already cnsumed.
+          if (rawRecord.offset() < startOffset) {
+            // If the start offset is not reached even after skipping the 
records for 10 seconds
+            // then the processing is stopped with a backoff to give the Kakfa 
server some time
+            // catch up.
+            if (sw.elapsed().getSeconds() > 10L) {
+              LOG.error(
+                  "The expected offset was not reached even after skipping 
consumed records for 10 seconds,"

Review Comment:
   would be good to include the offset reached and what the startOffset is to 
help with debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to