Hi Mitchell

We configured the following (see explanation below):

# - request.timeout.ms     = Maximum delay of response by broker after request 
being sent by consumer.
#                            A rather small value is useful. Bigger values 
causes later detection of system
#                            problems.
# - default.api.timeout.ms = Default timeout for all consumer methods without 
explicit timeout parameter.
#                            Should work for a worst case scenario (long 
processing time of one message)
#                            A value like 
"transformer.maxMessageProcessingTime.s" is useful.
# - session.timeout.ms     = Maximum heartbeat delay from consumer to broker.
#                            Should be a multiple of 
"transformer.maxMessageProcessingTime.s".
# - max.poll.interval.ms   = Maximum delay between two poll() invocations.
#                            Should be a multiple of 
"transformer.maxMessageProcessingTime.s" because multiple
#                            message are received by each poll 
("max.poll.records").
kafka.consumer.additionalProperties = { "request.timeout.ms": 25000, 
"default.api.timeout.ms": 300000, "session.timeout.ms": 300000, 
"max.poll.interval.ms": 300000 }
kafka.producer.additionalProperties = { "request.timeout.ms": 25000, 
"default.api.timeout.ms": 300000 }
transformer.maxMessageProcessingTime.s=120


Explanation:
We run different services that publish and receive data via kafka. One of those 
is called "transformer" for which we defined our own timeout 
(transformer.maxMessageProcessingTime.s).
The messages are usually processed within a few milliseconds, but there are 
some (very few) "monster" messages that can take up to 2 minutes being 
processed.
Therefor we defined kafka timeouts of 5 minutes assuming the worst case 
scenario to be finding two monster messages within the messages of one poll 
request. The remaining minute than should still be sufficient for the other 
"fast" messages.
Our service reads the two parameters kafka.<...>.additionProperties and passes 
the listed properties over to the underlying kafka software. So you will rather 
look at the configured (kafka) property list than at the configuration 
parameter name itself.
I added a hint text to our configuration to reduce the risk of a later 
misconfiguration.
Because our software runs in different versions, the configuration tries to 
work for Kafka 2.4 and 2.5. Therefor we define all relevant properties and do 
not rely on defaut values being changed meanwhile by the kafka team.

The documentation I found was https://kafka.apache.org/documentation/ (which 
covers 2.3 but no later version).
The other documentation 
https://docs.confluent.io/current/installation/configuration/index.html 
provides explanations for producer and consumer configuration parameters too, 
but gives no hint on the kafka version.
I found it quite hard to find relyable information which parameter works on 
which kafka version. I made no attempt (yet) to dig through the kafka source 
code.
The interaction and conflicts between the various timeout parameters doesn't 
make life easier, for people that do not follow every change in detail but use 
kafka and storm (just) as a tool to set their business system upon.

@Mitchell
Is this the information you wanted or did I miss out something of importance to 
you?


Reinhard

________________________________
Von: Mitchell Rathbun (BLOOMBERG/ 731 LEX) <mrathb...@bloomberg.net>
Gesendet: Montag, 4. November 2019 22:00
An: user@storm.apache.org <user@storm.apache.org>
Betreff: Re:AW: Re: Offset fetch failure causing topology crash

Stig, I will take a look at the Jira Issue you posted and make the 
corresponding change.

Reinhard, thank you for the response. Could you post what you set those two 
values to in order to fix the timeout issue? How did you arrive at changing 
those specific properties?

From: user@storm.apache.org At: 11/04/19 15:28:00
To: user@storm.apache.org<mailto:user@storm.apache.org>
Subject: AW: Re: Offset fetch failure causing topology crash


Hi



We ran into similar problems after switching to a newer Kafka Version in some 
other project.

They changed (as I understood) quite a few things concerning timeouts. I read 
in a thread that once Kafka fails, it might rise a cascade of following errors 
(as observed in our project). The conflicts remained in Kafka 2.4 but 
disappeared in Kafka 2.5.

We configured the following two parameters to deal with an older project 
version (using Kafka 2.4):

  *   session.timeout.ms
  *   max.poll.interval.ms

Please take into account that I currently have no code at hand to verify the 
correct spelling of the parameters, so there might be a slight difference 😉



There is some good documentation of all parameters and their meaning – but 
unfortunately mostly from projects that use Kafka. The Kafka documentation 
seems to stop with 2.3 (for some reasons I didnt find more actual 
documentation).



Hope that gives some hints.



Reinhard



---



Von: Stig Rohde Døssing<mailto:stigdoess...@gmail.com>
Gesendet: Montag, 4. November 2019 21:01
An: user@storm.apache.org<mailto:user@storm.apache.org>
Betreff: Re: Re: Offset fetch failure causing topology crash



Whoops, I think the user mailing list fell off the reply list. Sorry, didn't 
mean to mail you directly.



I haven't heard of this before, but people may have encountered it without 
mentioning it. I am not aware of a workaround. You're right that it would be 
good to get this fixed. https://issues.apache.org/jira/browse/STORM-3529 is 
open if you want to work on it. I think it should be pretty easy to catch and 
log RetriableExceptions in the same way we do elsewhere in the spout.



Den man. 4. nov. 2019 kl. 19.29 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX) 
<mrathb...@bloomberg.net<mailto:mrathb...@bloomberg.net>>:

I increased the "default.api.timeout.ms" as mentioned, and am still getting the 
error. I need to dig into the Kafka code some more, but if there isn't a simple 
config fix I can make, then isn't this a critical issue? A Kafka broker being 
brought down should not be a fatal issue for the topology, especially when the 
exception is coming from a non-critical metrics class. Do you know of any 
workarounds to this/has this been seen before? It seems like it should be a 
pretty common issue, given that broker turnaround in Kafka is not that uncommon.



From: Mitchell Rathbun (BLOOMBERG/ 731 LEX) At: 10/28/19 19:20:30

To: stigdoess...@gmail.com<mailto:stigdoess...@gmail.com>
Subject: Re: Offset fetch failure causing topology crash



I did some digging and I believe that we are now seeing this due to a recent 
upgrade from Kafka 0.10.1.0 to 2.3.0. The timeout used for the 
beginningOffsets/endOffsets calls was reduced from over 5 minutes to 60 seconds 
with this change. In 2.3.0, this property is set by "default.api.timeout.ms". 
There is also a property called "offset.commit.period.ms", which controls how 
often offsets are committed in the KafkaSpout. This property has a default 
value of 30 seconds. So if this fails once due to a broker that was a leader 
for one of the consumer's partitions being brought down, then the next time 
commits are attempted would constitute a timeout. So I am going to try either 
reducing "offset.commit.period.ms" or increasing "default.api.timeout.ms" and 
see if that fixes the issue.

From: stigdoess...@gmail.com<mailto:stigdoess...@gmail.com> At: 10/25/19 
18:57:31

To: Mitchell Rathbun (BLOOMBERG/ 731 LEX ) <mailto:mrathb...@bloomberg.net>
Subject: Re: Offset fetch failure causing topology crash



See 
https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L157
 and 
https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java.



I don't think there's a flag to disable the metrics currently. Regarding where 
they can be viewed, please see the section on metrics consumers at 
https://storm.apache.org/releases/2.0.0/Metrics.html.



We might want to change the metrics class to catch RetriableException and 
ignore them (with logging). Have raised 
https://issues.apache.org/jira/browse/STORM-3529.



Den fre. 25. okt. 2019 kl. 21.09 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX) 
<mrathb...@bloomberg.net<mailto:mrathb...@bloomberg.net>>:

Our topology is running version 1.2.3 and 2.3.0 for kafka-clients. We recently 
noticed the following before crashing on a weekend:



2019-10-18 21:05:16,256 ERROR util [Thread-130-customKafkaSpout-executor[17 
17]] Async loop died!

java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: 
Failed to get offsets by times in 60000ms

at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]

at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]

at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
~[storm-core-1.2.1.jar:1.2.1]

at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
~[storm-core-1.2.1.jar:1.2.1]

at 
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
 ~[storm-core-1.2.1.jar:1.2.1]

at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
~[storm-core-1.2.1.jar:1.2.1]

at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]



These crashes coincided with Kafka broker bounces. Our kafka cluster has 6 
brokers, and each partition has 6 replicas. Only one broker was ever down at 
once, so the ISR of each partition in the topic seemed to never be lower than 
5. This exception seemed to come from outside of the main kafka spout thread 
since we are catching exceptions in there. Looking into the kafka code a little 
further, this comes from the private method fetchOffsetsByTimes in the 
Fetcher.java class: 
https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L538.
 This method is called by Consumer.offsetsByTimes, Consumer.beginningOffsets, 
and Consumer.endOffsets. I noticed that beginningOffsets and endOffsets are 
called here: 
https://github.com/apache/storm/blob/e21110d338fe8ca71b904682be35642a00de9e78/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java#L80,
 which would explain the error happening outside of the KafkaSpout nextTuple 
thread. So a couple of questions:



-Is this a known error? It seemed to happen every time a broker came down

-What are these metrics/where can they be viewed? Is there a way to disable 
them?









Reply via email to