Repository: kafka
Updated Branches:
  refs/heads/trunk 93d451cee -> 5f88cf79f


MINOR: Increase max.poll time for streams consumers

Author: Eno Thereska <[email protected]>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2770 from enothereska/minor-increase-max-poll


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f88cf79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f88cf79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f88cf79

Branch: refs/heads/trunk
Commit: 5f88cf79fb51996e77614aafdfbb15e4989e159b
Parents: 93d451c
Author: Eno Thereska <[email protected]>
Authored: Thu Mar 30 15:58:24 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Thu Mar 30 15:58:24 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/streams/StreamsConfig.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f88cf79/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d2ba063..94719c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -401,7 +401,13 @@ public class StreamsConfig extends AbstractConfig {
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"300000");
+        // MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle 
cases when
+        // streams is recovering data from state stores. We may set it to 
Integer.MAX_VALUE since
+        // the streams code itself catches most exceptions and acts 
accordingly without needing
+        // this timeout. Note however that deadlocks are not detected (by 
definition) so we
+        // are losing the ability to detect them by setting this value to 
large. Hopefully
+        // deadlocks happen very rarely or never.
+        
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
 
         CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }

Reply via email to