Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-20 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531939832


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produc

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-19 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531455051


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-19 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produc

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-19 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produc

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528843407


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produc

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+produ