gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1531455051
########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { - // Init transactions call should always happen first in order to clear zombie transactions from previous generation. - producer.initTransactions(); - - final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); - - int messageProcessed = 0; - while (messageRemaining.get() > 0) { - try { - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord<Integer, String> record : records) { - // Process the record and send to downstream. - ProducerRecord<Integer, String> customizedRecord = transform(record); - producer.send(customizedRecord); + int processedRecords = 0; + long remainingRecords = Long.MAX_VALUE; + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster + int transactionTimeoutMs = 10_000; + // consumer must be in read_committed mode, which means it won't be able to read uncommitted data + boolean readCommitted = true; + try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic, + true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { + // called first and once to fence zombies and abort any pending transaction + producer.initTransactions(); + + consumer.subscribe(singleton(inputTopic), this); + + Utils.printOut("Processing new records"); + while (!closed && remainingRecords > 0) { + try { + ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200)); + if (!records.isEmpty()) { + // begin a new transaction session + producer.beginTransaction(); + + for (ConsumerRecord<Integer, String> record : records) { + // process the record and send downstream + ProducerRecord<Integer, String> newRecord = + new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); + producer.send(newRecord); + } + + // checkpoint the progress by sending offsets to group coordinator broker + // note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + + // commit the transaction including offsets + producer.commitTransaction(); + processedRecords += records.count(); } + } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { + // we can't recover from these exceptions + Utils.printErr(e.getMessage()); + shutdown(); + } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { + // invalid or no offset found without auto.reset.policy + Utils.printOut("Invalid or no offset found, using latest"); + consumer.seekToEnd(emptyList()); + consumer.commitSync(); + } catch (KafkaException e) { + // abort the transaction and try to continue + Utils.printOut("Aborting transaction: %s", e); + producer.abortTransaction(); + } + remainingRecords = getRemainingRecords(consumer); + if (remainingRecords != Long.MAX_VALUE) { + Utils.printOut("Remaining records: %d", remainingRecords); + } + } + } catch (Throwable e) { + Utils.printOut("Unhandled exception"); + e.printStackTrace(); + } + Utils.printOut("Processed %d records", processedRecords); + shutdown(); + } - Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(); + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + Utils.printOut("Revoked partitions: %s", partitions); + } - // Checkpoint the progress by sending offsets to group coordinator broker. - // Note that this API is only available for broker >= 2.5. - producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + Utils.printOut("Assigned partitions: %s", partitions); + } - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - messageProcessed += records.count(); - } - } catch (ProducerFencedException e) { - throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId)); - } catch (FencedInstanceIdException e) { - throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId)); - } catch (KafkaException e) { - // If we have not been fenced, try to abort the transaction and continue. This will raise immediately - // if the producer has hit a fatal error. - producer.abortTransaction(); - - // The consumer fetch position needs to be restored to the committed offset - // before the transaction started. - resetToLastCommittedPositions(consumer); - } + @Override + public void onPartitionsLost(Collection<TopicPartition> partitions) { + Utils.printOut("Lost partitions: %s", partitions); + } - messageRemaining.set(messagesRemaining(consumer)); - printWithTxnId("Message remaining: " + messageRemaining); + public void shutdown() { + if (!closed) { + closed = true; + latch.countDown(); } - - printWithTxnId("Finished processing " + messageProcessed + " records"); - latch.countDown(); } - private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() { + private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (TopicPartition topicPartition : consumer.assignment()) { offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); } return offsets; } - private void printWithTxnId(final String message) { - System.out.println(transactionalId + ": " + message); - } - - private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) { - printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")"); - return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value()); - } - - private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) { + private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment())); - // If we couldn't detect any end offset, that means we are still not able to fetch offsets. + // if we can't detect any end offset, that means we are still not able to fetch offsets if (fullEndOffsets.isEmpty()) { return Long.MAX_VALUE; } - return consumer.assignment().stream().mapToLong(partition -> { long currentPosition = consumer.position(partition); - printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets); if (fullEndOffsets.containsKey(partition)) { return fullEndOffsets.get(partition) - currentPosition; + } else { + return 0; } - return 0; }).sum(); } - - private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) { Review Comment: > In a real world application you could send failed records to a DLT (dead letter topic) for further processing. Yes, in commons, we can do some retries (such as five times); if they always fail to process, we can send them to DLT. > Here we can simply log them and/or add a comment. If we can add a comment, it's better. Otherwise, users may be confused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org