Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1531939832 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produc
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1531455051 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produc
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produc
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528843407 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produc
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +produ