chickenchickenlove commented on code in PR #21161:
URL: https://github.com/apache/kafka/pull/21161#discussion_r2658683685


##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
                         long messagesSentWithinCurrentTxn = records.count();
 
                         ConsumerGroupMetadata groupMetadata = useGroupMetadata 
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-                        
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        try {
+                            
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        } catch (KafkaException e) {

Review Comment:
   IMHO, shouldn't we focus on handling the `TimeoutException`?
   
   AFAIK, `CONCURRENT_TRANSACTIONS` eventually manifests as a 
`TimeoutException` on the client side. I'm concerned that broad scope retries 
might mask other underlying errors. In those cases, we need to clearly identify 
the root cause to take appropriate action.
   
   What do you think?



##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
                         long messagesSentWithinCurrentTxn = records.count();
 
                         ConsumerGroupMetadata groupMetadata = useGroupMetadata 
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-                        
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        try {
+                            
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        } catch (KafkaException e) {
+                            // in case the producer gets stuck here, create a 
new one and continue the loop
+                            try { producer.close(Duration.ofSeconds(0)); } 
catch (Exception ignore) {}
+                            parsedArgs.getAttrs().put("transactionalId", 
parsedArgs.getString("transactionalId") + producerNumber++);

Review Comment:
   Is there a safer way to generate a globally unique transactionalId to avoid 
collisions?
   
   Simply appending an incremental number `(producerNumber++)` to the 
user-provided ID seems risky in a shared cluster environment. If the generated 
ID happens to match an existing transactionalId of a running production 
application, it could trigger the fencing mechanism, unintentionally aborting 
the active transactions of that application.
   
   Perhaps appending a UUID or a random suffix would be a safer approach to 
ensure uniqueness?
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to