[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread wangzzu
Github user wangzzu commented on the issue:

https://github.com/apache/storm/pull/2791
  
@srdo I found that we are using the deprecated Subscription subtypes 
actually, thx.


---


[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

2018-08-03 Thread wangzzu
Github user wangzzu closed the pull request at:

https://github.com/apache/storm/pull/2791


---


[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread wangzzu
Github user wangzzu commented on the issue:

https://github.com/apache/storm/pull/2791
  
@srdo sorry, the version I used is 1.1.2. This problem has already fixed in 
1.2.0.


---


[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

2018-08-03 Thread wangzzu
GitHub user wangzzu opened a pull request:

https://github.com/apache/storm/pull/2791

STORM-3176: KafkaSpout commit offset occurs CommitFailedException which 
leads to worker dead

KafkaSpout use the commitAsync api of Consumer, if the interval time 
between call consumer.poll() more than max.poll.interval.ms or the heartbeat of 
consumer timeout, that will occur CommitFailedException,  and then the worker 
will dead, the log like this:

```
2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer th
an the configured max.poll.interval.ms, which typically implies that the 
poll loop is spending too much time message processing. You can address this 
either by increasing the session timeout or by reducing the maximum size of 
batches returned in
poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
 ~[stormjar.jar:?]
at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
 ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
~[stormjar.jar:?]
at 
org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
 ~[storm-core-1.1.2-mt001.jar:?]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) 
[storm-core-1.1.2-mt001.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
```

I find it will catch the Exception in auto-commit mode of consumer, the 
source code is:

```java
private void maybeAutoCommitOffsetsSync(long timeoutMs) {
if (autoCommitEnabled) {
Map allConsumedOffsets = 
subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {} for 
group {}", allConsumedOffsets, groupId);
if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
log.debug("Auto-commit of offsets {} for group {} timed out 
before completion",
allConsumedOffsets, groupId);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} for group {} was 
interrupted before completion",
allConsumedOffsets, groupId);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not 
propagate the exception
log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
e.getMessage());
}
}
}
```

I think KafkaSpout should do like this, catch the Exception avoid to worker 
die. And when the msg ack fail, Spout should judge the offset of the msgID is 
larger than the last commit offset(Spout can guarantee that these msgs which 
offset less than the last commit offset are all ack), if not, the msg should 
not retry.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangzzu/storm storm-kafka-client

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2791


commit 54005a2bd28be7928cee05d6500136d3f1fb926d
Author: wangmeng36 
Date:   2018-08-03T08:44:40Z

storm-kafka-client fix the CommitFailedException bug




---