Repository: kafka Updated Branches: refs/heads/1.0 43dda72f6 -> 6031ee7ad
MINOR: Bump the request timeout for the transactional message copier Multiple inflights means that when there are rolling bounces or other cluster instability, there is an increased likelihood of having previously tried batch expire in the accumulator. This is a fatal error for a transactional producer, causing the `TransactionalMessageCopier` to exit. To work around this, we bump the request timeout. We can get rid of this when KIP-91 is merged. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #4039 from apurvam/MINOR-bump-request-timeout-in-transactional-message-copier (cherry picked from commit 34188b4cc4703f6919526ae10b7185842d1047b4) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6031ee7a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6031ee7a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6031ee7a Branch: refs/heads/1.0 Commit: 6031ee7ad37b2d54123b390c8217aa781a3a890f Parents: 43dda72 Author: Apurva Mehta <apu...@confluent.io> Authored: Thu Oct 12 16:53:27 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Oct 12 16:54:48 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/tools/TransactionalMessageCopier.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6031ee7a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 0d74645..87d27e8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -155,6 +155,12 @@ public class TransactionalMessageCopier { props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + // Multiple inflights means that when there are rolling bounces and other cluster instability, there is an + // increased likelihood of having previously tried batch expire in the accumulator. This is a fatal error + // for a transaction, causing the copier to exit. To work around this, we bump the request timeout. + // We can get rid of this when KIP-91 is merged. + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); + return new KafkaProducer<>(props); }