Hi Mitchell We configured the following (see explanation below):
# - request.timeout.ms = Maximum delay of response by broker after request being sent by consumer. # A rather small value is useful. Bigger values causes later detection of system # problems. # - default.api.timeout.ms = Default timeout for all consumer methods without explicit timeout parameter. # Should work for a worst case scenario (long processing time of one message) # A value like "transformer.maxMessageProcessingTime.s" is useful. # - session.timeout.ms = Maximum heartbeat delay from consumer to broker. # Should be a multiple of "transformer.maxMessageProcessingTime.s". # - max.poll.interval.ms = Maximum delay between two poll() invocations. # Should be a multiple of "transformer.maxMessageProcessingTime.s" because multiple # message are received by each poll ("max.poll.records"). kafka.consumer.additionalProperties = { "request.timeout.ms": 25000, "default.api.timeout.ms": 300000, "session.timeout.ms": 300000, "max.poll.interval.ms": 300000 } kafka.producer.additionalProperties = { "request.timeout.ms": 25000, "default.api.timeout.ms": 300000 } transformer.maxMessageProcessingTime.s=120 Explanation: We run different services that publish and receive data via kafka. One of those is called "transformer" for which we defined our own timeout (transformer.maxMessageProcessingTime.s). The messages are usually processed within a few milliseconds, but there are some (very few) "monster" messages that can take up to 2 minutes being processed. Therefor we defined kafka timeouts of 5 minutes assuming the worst case scenario to be finding two monster messages within the messages of one poll request. The remaining minute than should still be sufficient for the other "fast" messages. Our service reads the two parameters kafka.<...>.additionProperties and passes the listed properties over to the underlying kafka software. So you will rather look at the configured (kafka) property list than at the configuration parameter name itself. I added a hint text to our configuration to reduce the risk of a later misconfiguration. Because our software runs in different versions, the configuration tries to work for Kafka 2.4 and 2.5. Therefor we define all relevant properties and do not rely on defaut values being changed meanwhile by the kafka team. The documentation I found was https://kafka.apache.org/documentation/ (which covers 2.3 but no later version). The other documentation https://docs.confluent.io/current/installation/configuration/index.html provides explanations for producer and consumer configuration parameters too, but gives no hint on the kafka version. I found it quite hard to find relyable information which parameter works on which kafka version. I made no attempt (yet) to dig through the kafka source code. The interaction and conflicts between the various timeout parameters doesn't make life easier, for people that do not follow every change in detail but use kafka and storm (just) as a tool to set their business system upon. @Mitchell Is this the information you wanted or did I miss out something of importance to you? Reinhard ________________________________ Von: Mitchell Rathbun (BLOOMBERG/ 731 LEX) <mrathb...@bloomberg.net> Gesendet: Montag, 4. November 2019 22:00 An: user@storm.apache.org <user@storm.apache.org> Betreff: Re:AW: Re: Offset fetch failure causing topology crash Stig, I will take a look at the Jira Issue you posted and make the corresponding change. Reinhard, thank you for the response. Could you post what you set those two values to in order to fix the timeout issue? How did you arrive at changing those specific properties? From: user@storm.apache.org At: 11/04/19 15:28:00 To: user@storm.apache.org<mailto:user@storm.apache.org> Subject: AW: Re: Offset fetch failure causing topology crash Hi We ran into similar problems after switching to a newer Kafka Version in some other project. They changed (as I understood) quite a few things concerning timeouts. I read in a thread that once Kafka fails, it might rise a cascade of following errors (as observed in our project). The conflicts remained in Kafka 2.4 but disappeared in Kafka 2.5. We configured the following two parameters to deal with an older project version (using Kafka 2.4): * session.timeout.ms * max.poll.interval.ms Please take into account that I currently have no code at hand to verify the correct spelling of the parameters, so there might be a slight difference 😉 There is some good documentation of all parameters and their meaning – but unfortunately mostly from projects that use Kafka. The Kafka documentation seems to stop with 2.3 (for some reasons I didnt find more actual documentation). Hope that gives some hints. Reinhard --- Von: Stig Rohde Døssing<mailto:stigdoess...@gmail.com> Gesendet: Montag, 4. November 2019 21:01 An: user@storm.apache.org<mailto:user@storm.apache.org> Betreff: Re: Re: Offset fetch failure causing topology crash Whoops, I think the user mailing list fell off the reply list. Sorry, didn't mean to mail you directly. I haven't heard of this before, but people may have encountered it without mentioning it. I am not aware of a workaround. You're right that it would be good to get this fixed. https://issues.apache.org/jira/browse/STORM-3529 is open if you want to work on it. I think it should be pretty easy to catch and log RetriableExceptions in the same way we do elsewhere in the spout. Den man. 4. nov. 2019 kl. 19.29 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX) <mrathb...@bloomberg.net<mailto:mrathb...@bloomberg.net>>: I increased the "default.api.timeout.ms" as mentioned, and am still getting the error. I need to dig into the Kafka code some more, but if there isn't a simple config fix I can make, then isn't this a critical issue? A Kafka broker being brought down should not be a fatal issue for the topology, especially when the exception is coming from a non-critical metrics class. Do you know of any workarounds to this/has this been seen before? It seems like it should be a pretty common issue, given that broker turnaround in Kafka is not that uncommon. From: Mitchell Rathbun (BLOOMBERG/ 731 LEX) At: 10/28/19 19:20:30 To: stigdoess...@gmail.com<mailto:stigdoess...@gmail.com> Subject: Re: Offset fetch failure causing topology crash I did some digging and I believe that we are now seeing this due to a recent upgrade from Kafka 0.10.1.0 to 2.3.0. The timeout used for the beginningOffsets/endOffsets calls was reduced from over 5 minutes to 60 seconds with this change. In 2.3.0, this property is set by "default.api.timeout.ms". There is also a property called "offset.commit.period.ms", which controls how often offsets are committed in the KafkaSpout. This property has a default value of 30 seconds. So if this fails once due to a broker that was a leader for one of the consumer's partitions being brought down, then the next time commits are attempted would constitute a timeout. So I am going to try either reducing "offset.commit.period.ms" or increasing "default.api.timeout.ms" and see if that fixes the issue. From: stigdoess...@gmail.com<mailto:stigdoess...@gmail.com> At: 10/25/19 18:57:31 To: Mitchell Rathbun (BLOOMBERG/ 731 LEX ) <mailto:mrathb...@bloomberg.net> Subject: Re: Offset fetch failure causing topology crash See https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L157 and https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java. I don't think there's a flag to disable the metrics currently. Regarding where they can be viewed, please see the section on metrics consumers at https://storm.apache.org/releases/2.0.0/Metrics.html. We might want to change the metrics class to catch RetriableException and ignore them (with logging). Have raised https://issues.apache.org/jira/browse/STORM-3529. Den fre. 25. okt. 2019 kl. 21.09 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX) <mrathb...@bloomberg.net<mailto:mrathb...@bloomberg.net>>: Our topology is running version 1.2.3 and 2.3.0 for kafka-clients. We recently noticed the following before crashing on a weekend: 2019-10-18 21:05:16,256 ERROR util [Thread-130-customKafkaSpout-executor[17 17]] Async loop died! java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) ~[storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] These crashes coincided with Kafka broker bounces. Our kafka cluster has 6 brokers, and each partition has 6 replicas. Only one broker was ever down at once, so the ISR of each partition in the topic seemed to never be lower than 5. This exception seemed to come from outside of the main kafka spout thread since we are catching exceptions in there. Looking into the kafka code a little further, this comes from the private method fetchOffsetsByTimes in the Fetcher.java class: https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L538. This method is called by Consumer.offsetsByTimes, Consumer.beginningOffsets, and Consumer.endOffsets. I noticed that beginningOffsets and endOffsets are called here: https://github.com/apache/storm/blob/e21110d338fe8ca71b904682be35642a00de9e78/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java#L80, which would explain the error happening outside of the KafkaSpout nextTuple thread. So a couple of questions: -Is this a known error? It seemed to happen every time a broker came down -What are these metrics/where can they be viewed? Is there a way to disable them?