Pankraz76 commented on code in PR #21161:
URL: https://github.com/apache/kafka/pull/21161#discussion_r2630224707


##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
                         long messagesSentWithinCurrentTxn = records.count();
 
                         ConsumerGroupMetadata groupMetadata = useGroupMetadata 
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-                        
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        try {
+                            
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+                        } catch (KafkaException e) {
+                            // in case the producer gets stuck here, create a 
new one and continue the loop
+                            try { producer.close(Duration.ofSeconds(0)); } 
catch (Exception ignore) {}
+                            parsedArgs.getAttrs().put("transactionalId", 
parsedArgs.getString("transactionalId") + producerNumber++);
+                            producer = createProducer(parsedArgs);
+                            producer.initTransactions();
+                            resetToLastCommittedPositions(consumer);

Review Comment:
   ```suggestion
                               circutBreaker(); // in case the producer gets 
stuck here, create a new one and continue the loop
   ```
   
   could give dedicated to this concern apply single responsibility principle, 
giving more focus to each own. Here its just about breaking the circut, how 
this is actually done seems to be some kind of (randomly) changing impl. detail.



##########
tests/kafkatest/tests/core/transactions_upgrade_test.py:
##########
@@ -179,7 +179,7 @@ def copy_messages_transactionally_during_upgrade(self, 
input_topic, output_topic
 
         self.perform_upgrade(from_kafka_version)
 
-        copier_timeout_sec = 180
+        copier_timeout_sec = 360 

Review Comment:
   ```suggestion
           copier_timeout_sec = 360
   ```
   sorry again this something for SCA. Taking away the off-topics upfront.
   
   spotless and rewrite both ready to fix on their own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to