chickenchickenlove commented on code in PR #21161:
URL: https://github.com/apache/kafka/pull/21161#discussion_r2658683685
##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
long messagesSentWithinCurrentTxn = records.count();
ConsumerGroupMetadata groupMetadata = useGroupMetadata
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ try {
+
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ } catch (KafkaException e) {
Review Comment:
IMHO, shouldn't we focus on handling the `TimeoutException`?
AFAIK, `CONCURRENT_TRANSACTIONS` eventually manifests as a
`TimeoutException` on the client side. I'm concerned that broad scope retries
might mask other underlying errors. In those cases, we need to clearly identify
the root cause to take appropriate action.
What do you think?
##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
long messagesSentWithinCurrentTxn = records.count();
ConsumerGroupMetadata groupMetadata = useGroupMetadata
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ try {
+
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ } catch (KafkaException e) {
+ // in case the producer gets stuck here, create a
new one and continue the loop
+ try { producer.close(Duration.ofSeconds(0)); }
catch (Exception ignore) {}
+ parsedArgs.getAttrs().put("transactionalId",
parsedArgs.getString("transactionalId") + producerNumber++);
Review Comment:
Is there a safer way to generate a globally unique transactionalId to avoid
collisions?
Simply appending an incremental number `(producerNumber++)` to the
user-provided ID seems risky in a shared cluster environment. If the generated
ID happens to match an existing transactionalId of a running production
application, it could trigger the fencing mechanism, unintentionally aborting
the active transactions of that application.
Perhaps appending a UUID or a random suffix would be a safer approach to
ensure uniqueness?
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]