Pankraz76 commented on code in PR #21161:
URL: https://github.com/apache/kafka/pull/21161#discussion_r2630224707
##########
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java:
##########
@@ -387,7 +389,17 @@ public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
long messagesSentWithinCurrentTxn = records.count();
ConsumerGroupMetadata groupMetadata = useGroupMetadata
? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
-
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ try {
+
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
+ } catch (KafkaException e) {
+ // in case the producer gets stuck here, create a
new one and continue the loop
+ try { producer.close(Duration.ofSeconds(0)); }
catch (Exception ignore) {}
+ parsedArgs.getAttrs().put("transactionalId",
parsedArgs.getString("transactionalId") + producerNumber++);
+ producer = createProducer(parsedArgs);
+ producer.initTransactions();
+ resetToLastCommittedPositions(consumer);
Review Comment:
```suggestion
circutBreaker(); // in case the producer gets
stuck here, create a new one and continue the loop
```
could give dedicated to this concern apply single responsibility principle,
giving more focus to each own. Here its just about breaking the circut, how
this is actually done seems to be some kind of (randomly) changing impl. detail.
##########
tests/kafkatest/tests/core/transactions_upgrade_test.py:
##########
@@ -179,7 +179,7 @@ def copy_messages_transactionally_during_upgrade(self,
input_topic, output_topic
self.perform_upgrade(from_kafka_version)
- copier_timeout_sec = 180
+ copier_timeout_sec = 360
Review Comment:
```suggestion
copier_timeout_sec = 360
```
sorry again this something for SCA. Taking away the off-topics upfront.
spotless and rewrite both ready to fix on their own.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]