fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609885
########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { - // Init transactions call should always happen first in order to clear zombie transactions from previous generation. - producer.initTransactions(); - - final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); - - int messageProcessed = 0; - while (messageRemaining.get() > 0) { - try { - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord<Integer, String> record : records) { - // Process the record and send to downstream. - ProducerRecord<Integer, String> customizedRecord = transform(record); - producer.send(customizedRecord); + int processedRecords = 0; + long remainingRecords = Long.MAX_VALUE; + try { + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster + int transactionTimeoutMs = 10_000; + KafkaProducer<Integer, String> producer = + new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + + // consumer must be in read_committed mode, which means it won't be able to read uncommitted data + boolean readCommitted = true; + KafkaConsumer<Integer, String> consumer = new Consumer( + "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) + .createKafkaConsumer(); + + // called first and once to fence zombies and abort any pending transaction + producer.initTransactions(); + + consumer.subscribe(singleton(inputTopic), this); + + Utils.printOut("Processing new records"); + while (!closed && remainingRecords > 0) { + try { + ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200)); + if (!records.isEmpty()) { + // begin a new transaction session + producer.beginTransaction(); + + for (ConsumerRecord<Integer, String> record : records) { + // process the record and send downstream + ProducerRecord<Integer, String> newRecord = + new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); + producer.send(newRecord); + } + + // checkpoint the progress by sending offsets to group coordinator broker + // note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + + // commit the transaction including offsets + producer.commitTransaction(); + processedRecords += records.count(); } - - Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(); - - // Checkpoint the progress by sending offsets to group coordinator broker. - // Note that this API is only available for broker >= 2.5. - producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - messageProcessed += records.count(); + } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { Review Comment: Well, this is consistent with the consumer. The exception list here is shorter and much more compact (no repeated instanceof). If you don't mind, I would keep it as it is. ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { - // Init transactions call should always happen first in order to clear zombie transactions from previous generation. - producer.initTransactions(); - - final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); - - int messageProcessed = 0; - while (messageRemaining.get() > 0) { - try { - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord<Integer, String> record : records) { - // Process the record and send to downstream. - ProducerRecord<Integer, String> customizedRecord = transform(record); - producer.send(customizedRecord); + int processedRecords = 0; + long remainingRecords = Long.MAX_VALUE; + try { + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster + int transactionTimeoutMs = 10_000; + KafkaProducer<Integer, String> producer = + new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + + // consumer must be in read_committed mode, which means it won't be able to read uncommitted data + boolean readCommitted = true; + KafkaConsumer<Integer, String> consumer = new Consumer( Review Comment: Thanks for catching this, I think I've lost that while doing refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org