Repository: kafka Updated Branches: refs/heads/trunk 93d451cee -> 5f88cf79f
MINOR: Increase max.poll time for streams consumers Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang Closes #2770 from enothereska/minor-increase-max-poll Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f88cf79 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f88cf79 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f88cf79 Branch: refs/heads/trunk Commit: 5f88cf79fb51996e77614aafdfbb15e4989e159b Parents: 93d451c Author: Eno Thereska <[email protected]> Authored: Thu Mar 30 15:58:24 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 30 15:58:24 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5f88cf79/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d2ba063..94719c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -401,7 +401,13 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); + // MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when + // streams is recovering data from state stores. We may set it to Integer.MAX_VALUE since + // the streams code itself catches most exceptions and acts accordingly without needing + // this timeout. Note however that deadlocks are not detected (by definition) so we + // are losing the ability to detect them by setting this value to large. Hopefully + // deadlocks happen very rarely or never. + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); }
