scwhittle commented on code in PR #32456:
URL: https://github.com/apache/beam/pull/32456#discussion_r1764564156


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,25 @@ public ProcessContinuation processElement(
           }
           return ProcessContinuation.resume();
         }
+        long skippedRecords = 0L;
+        final Stopwatch sw = Stopwatch.createStarted();
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped. This is needed because there is 
a possibility
+          // that the seek() above fails to move the offset to the desired 
position. In which
+          // case poll() would return records that are already cnsumed.
+          if (rawRecord.offset() < startOffset) {
+            // If the start offset is not reached even after skipping the 
records for 10 seconds
+            // then the processing is stopped with a backoff to give the Kakfa 
server some time
+            // catch up.
+            if (sw.elapsed().toMillis() > 10000L) {

Review Comment:
   nit: use getSeconds() to help readability



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,25 @@ public ProcessContinuation processElement(
           }
           return ProcessContinuation.resume();
         }
+        long skippedRecords = 0L;
+        final Stopwatch sw = Stopwatch.createStarted();
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped. This is needed because there is 
a possibility
+          // that the seek() above fails to move the offset to the desired 
position. In which
+          // case poll() would return records that are already cnsumed.
+          if (rawRecord.offset() < startOffset) {
+            // If the start offset is not reached even after skipping the 
records for 10 seconds
+            // then the processing is stopped with a backoff to give the Kakfa 
server some time
+            // catch up.
+            if (sw.elapsed().toMillis() > 10000L) {
+              return ProcessContinuation.resume()

Review Comment:
   add an error log here that offset was not reached within 10 seconds and 
processing will be retried with backoff



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,25 @@ public ProcessContinuation processElement(
           }
           return ProcessContinuation.resume();
         }
+        long skippedRecords = 0L;
+        final Stopwatch sw = Stopwatch.createStarted();
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped. This is needed because there is 
a possibility
+          // that the seek() above fails to move the offset to the desired 
position. In which
+          // case poll() would return records that are already cnsumed.
+          if (rawRecord.offset() < startOffset) {
+            // If the start offset is not reached even after skipping the 
records for 10 seconds
+            // then the processing is stopped with a backoff to give the Kakfa 
server some time
+            // catch up.
+            if (sw.elapsed().toMillis() > 10000L) {
+              return ProcessContinuation.resume()
+                  .withResumeDelay(org.joda.time.Duration.standardMinutes(5L));

Review Comment:
   How about 10 seconds? If there is other work it will be processed and we 
might not respect the 10 seconds anyway. And if there isn't other work we might 
as well poll sooner to reduce latency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to