[ https://issues.apache.org/jira/browse/STORM-3176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stig Rohde Døssing reopened STORM-3176: --------------------------------------- > KafkaSpout commit offset occurs CommitFailedException which leads to worker > dead > -------------------------------------------------------------------------------- > > Key: STORM-3176 > URL: https://issues.apache.org/jira/browse/STORM-3176 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.1.2 > Reporter: Matt Wang > Priority: Critical > Labels: pull-request-available > Fix For: 1.2.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > KafkaSpout use the commitAsync api of Consumer, if the interval time between > the call of consumer.poll() more than _max.poll.interval.ms_ or the heartbeat > of consumer timeout, that will occur CommitFailedException, and then the > worker will die, the log like this: > {code:java} > // 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died! > org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer th > an the configured max.poll.interval.ms, which typically implies that the poll > loop is spending too much time message processing. You can address this > either by increasing the session timeout or by reducing the maximum size of > batches returned in > poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430) > ~[stormjar.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647) > ~[storm-core-1.1.2-mt001.jar:?] > at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) > [storm-core-1.1.2-mt001.jar:?] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] > 2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR] > {code} > I find it will catch the Exception in auto-commit mode of consumer, the > source code is: > {code:java} > // private void maybeAutoCommitOffsetsSync(long timeoutMs) { > if (autoCommitEnabled) { > Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = > subscriptions.allConsumed(); > try { > log.debug("Sending synchronous auto-commit of offsets {} for > group {}", allConsumedOffsets, groupId); > if (!commitOffsetsSync(allConsumedOffsets, timeoutMs)) > log.debug("Auto-commit of offsets {} for group {} timed out > before completion", > allConsumedOffsets, groupId); > } catch (WakeupException | InterruptException e) { > log.debug("Auto-commit of offsets {} for group {} was interrupted > before completion", > allConsumedOffsets, groupId); > // rethrow wakeups since they are triggered by the user > throw e; > } catch (Exception e) { > // consistent with async auto-commit failures, we do not > propagate the exception > log.warn("Auto-commit of offsets {} failed for group {}: {}", > allConsumedOffsets, groupId, > e.getMessage()); > } > } > } > {code} > I think KafkaSpout should do like this, catch the Exception avoid to worker > die. And when the msg ack failed, Spout should judge the offset of the msgID > is larger than the last commit offset(Spout can guarantee that these msgs > which offset less than the last commit offset are all ack), if not, the msg > should not retry. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)