fvaleri commented on code in PR #15561: URL: https://github.com/apache/kafka/pull/15561#discussion_r1533512744
########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { + retries++; + if (retries > 0 && retries <= MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + } else if (retries > MAX_RETRIES) { + // continue: skip bad records + // in addition to logging, you may want to send these records to a DLQ for further processing + records.forEach(record -> { + Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); + consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); + consumer.commitSync(); Review Comment: Yes it is. Seek is not needed. Fixed. ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { + retries++; + if (retries > 0 && retries <= MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + } else if (retries > MAX_RETRIES) { + // continue: skip bad records + // in addition to logging, you may want to send these records to a DLQ for further processing + records.forEach(record -> { + Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); + consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); + consumer.commitSync(); + }); + retries = 0; + } Review Comment: I don't see how retries can be less than zero. ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { Review Comment: Agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org