[ 
https://issues.apache.org/jira/browse/STORM-3176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reopened STORM-3176:
---------------------------------------

> KafkaSpout commit offset occurs CommitFailedException which leads to worker 
> dead
> --------------------------------------------------------------------------------
>
>                 Key: STORM-3176
>                 URL: https://issues.apache.org/jira/browse/STORM-3176
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.2
>            Reporter: Matt Wang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.2.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> KafkaSpout use the commitAsync api of Consumer, if the interval time between 
> the call of consumer.poll() more than _max.poll.interval.ms_ or the heartbeat 
> of consumer timeout, that will occur CommitFailedException,  and then the 
> worker will die, the log like this: 
> {code:java}
> // 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
> org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer th
> an the configured max.poll.interval.ms, which typically implies that the poll 
> loop is spending too much time message processing. You can address this 
> either by increasing the session timeout or by reducing the maximum size of 
> batches returned in
> poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
>  ~[stormjar.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
> ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
>  ~[storm-core-1.1.2-mt001.jar:?]
> at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) 
> [storm-core-1.1.2-mt001.jar:?]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
> 2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
> {code}
> I find it will catch the Exception in auto-commit mode of consumer, the 
> source code is:
> {code:java}
> // private void maybeAutoCommitOffsetsSync(long timeoutMs) {
>     if (autoCommitEnabled) {
>         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
> subscriptions.allConsumed();
>         try {
>             log.debug("Sending synchronous auto-commit of offsets {} for 
> group {}", allConsumedOffsets, groupId);
>             if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
>                 log.debug("Auto-commit of offsets {} for group {} timed out 
> before completion",
>                         allConsumedOffsets, groupId);
>         } catch (WakeupException | InterruptException e) {
>             log.debug("Auto-commit of offsets {} for group {} was interrupted 
> before completion",
>                     allConsumedOffsets, groupId);
>             // rethrow wakeups since they are triggered by the user
>             throw e;
>         } catch (Exception e) {
>             // consistent with async auto-commit failures, we do not 
> propagate the exception
>             log.warn("Auto-commit of offsets {} failed for group {}: {}", 
> allConsumedOffsets, groupId,
>                     e.getMessage());
>         }
>     }
> }
> {code}
> I think KafkaSpout should do like this, catch the Exception avoid to worker 
> die. And when the msg ack failed, Spout should judge the offset of the msgID 
> is larger than the last commit offset(Spout can guarantee that these msgs 
> which offset less than the last commit offset are all ack), if not, the msg 
> should not retry.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to