scwhittle commented on code in PR #32456:
URL: https://github.com/apache/beam/pull/32456#discussion_r1770968694
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -460,7 +460,28 @@ public ProcessContinuation processElement(
}
return ProcessContinuation.resume();
}
+ long skippedRecords = 0L;
+ final Stopwatch sw = Stopwatch.createStarted();
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+ // If the Kafka consumer returns a record with an offset that is
already processed
+ // the record can be safely skipped. This is needed because there is
a possibility
+ // that the seek() above fails to move the offset to the desired
position. In which
+ // case poll() would return records that are already cnsumed.
+ if (rawRecord.offset() < startOffset) {
+ // If the start offset is not reached even after skipping the
records for 10 seconds
+ // then the processing is stopped with a backoff to give the Kakfa
server some time
+ // catch up.
+ if (sw.elapsed().getSeconds() > 10L) {
+ LOG.error(
+ "The expected offset was not reached even after skipping
consumed records for 10 seconds,"
+ + " the processing of this bundle will be attempted at a
later time.");
+ return ProcessContinuation.resume()
+
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
+ } else {
+ skippedRecords++;
+ continue;
+ }
+ }
if (!tracker.tryClaim(rawRecord.offset())) {
Review Comment:
it is now going to log for every skipped record. I meant to move it to the
same scope as the tryclaim
```
++skippedRecords;
continue;
}
}
if (skippedRecords > 0) {
// log about skipped records since we're done skipping (didn't continue
above)
LOG()...
skippedRecords = 0; // so we don't log again
}
if (!tracker.tryClaim(rawRecord.offset())) {
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -461,6 +462,34 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+ // If the Kafka consumer returns a record with an offset that is
already processed
+ // the record can be safely skipped. This is needed because there is
a possibility
+ // that the seek() above fails to move the offset to the desired
position. In which
+ // case poll() would return records that are already cnsumed.
+ if (rawRecord.offset() < startOffset) {
+ // If the start offset is not reached even after skipping the
records for 10 seconds
+ // then the processing is stopped with a backoff to give the Kakfa
server some time
+ // catch up.
+ if (sw.elapsed().getSeconds() > 10L) {
+ LOG.error(
+ "The expected offset ({}) was not reached even after"
+ + " skipping consumed records for 10 seconds. The offset
we could"
+ + " reach was {}. The processing of this bundle will be
attempted"
+ + " at a later time.",
+ expectedOffset,
+ rawRecord.offset());
+ return ProcessContinuation.resume()
+
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
+ } else {
Review Comment:
nit: can remove the else to reduce nesting since there is a return in the
other case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]