Repository: kafka Updated Branches: refs/heads/trunk 31fe1f98e -> 34188b4cc
MINOR: Bump the request timeout for the transactional message copier Multiple inflights means that when there are rolling bounces or other cluster instability, there is an increased likelihood of having previously tried batch expire in the accumulator. This is a fatal error for a transactional producer, causing the `TransactionalMessageCopier` to exit. To work around this, we bump the request timeout. We can get rid of this when KIP-91 is merged. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #4039 from apurvam/MINOR-bump-request-timeout-in-transactional-message-copier Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34188b4c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34188b4c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34188b4c Branch: refs/heads/trunk Commit: 34188b4cc4703f6919526ae10b7185842d1047b4 Parents: 31fe1f9 Author: Apurva Mehta <apu...@confluent.io> Authored: Thu Oct 12 16:53:27 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Oct 12 16:54:26 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/tools/TransactionalMessageCopier.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/34188b4c/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 0d74645..87d27e8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -155,6 +155,12 @@ public class TransactionalMessageCopier { props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + // Multiple inflights means that when there are rolling bounces and other cluster instability, there is an + // increased likelihood of having previously tried batch expire in the accumulator. This is a fatal error + // for a transaction, causing the copier to exit. To work around this, we bump the request timeout. + // We can get rid of this when KIP-91 is merged. + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); + return new KafkaProducer<>(props); }