fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609481


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = 
transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't 
be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, 
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending 
transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = 
consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) 
{
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group 
coordinator broker
+                        // note that this API is only available for broker >= 
2.5
+                        
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = 
consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group 
coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be 
visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+                         | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+                    Utils.printErr(e.getMessage());
+                    // we can't recover from these exceptions
+                    shutdown();
+                } catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
                 }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id 
%s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id 
%s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction 
and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the 
committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
             }
-
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }

Review Comment:
   Yep.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -20,56 +20,63 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
 
 /**
- * A demo class for how to write a customized EOS app. It takes a 
consume-process-produce loop.
- * Important configurations and APIs are commented.
+ * This class implements a read-process-write application.
  */
-public class ExactlyOnceMessageProcessor extends Thread {
-
-    private static final boolean READ_COMMITTED = true;
-
+public class ExactlyOnceMessageProcessor extends Thread implements 
ConsumerRebalanceListener {
+    private final String bootstrapServers;
     private final String inputTopic;
     private final String outputTopic;
-    private final String transactionalId;
     private final String groupInstanceId;
+    private final CountDownLatch latch;
+    private final String transactionalId;
+    private volatile boolean closed;
 
     private final KafkaProducer<Integer, String> producer;
     private final KafkaConsumer<Integer, String> consumer;
 
-    private final CountDownLatch latch;
-
-    public ExactlyOnceMessageProcessor(final String inputTopic,
-                                       final String outputTopic,
-                                       final int instanceIdx,
-                                       final CountDownLatch latch) {
+    public ExactlyOnceMessageProcessor(String threadName,
+                                       String bootstrapServers,
+                                       String inputTopic,
+                                       String outputTopic,
+                                       CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.inputTopic = inputTopic;
         this.outputTopic = outputTopic;
-        this.transactionalId = "Processor-" + instanceIdx;
+        this.transactionalId = threadName;
         // It is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster.
         final int transactionTimeoutMs = 10000;
         // A unique transactional.id must be provided in order to properly use 
EOS.
         producer = new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
         // Consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data.
         // Consumer could optionally configure groupInstanceId to avoid 
unnecessary rebalances.
-        this.groupInstanceId = "Txn-consumer-" + instanceIdx;
+        this.groupInstanceId = threadName;

Review Comment:
   Sure, this may help while debugging.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = 
transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to