[ 
https://issues.apache.org/jira/browse/STORM-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felipe Cavalcanti updated STORM-2342:
-------------------------------------
    Description: 
I've created a topology that will read from kafka using storm-kafka-client but 
when it reaches the last message on kafka log it stops consuming and get stuck, 
new messages are never consumed, here are the kafka logs:

[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Preparing to 
restabilize group kafka-spout with old generation 29 
(kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Stabilized group 
kafka-spout generation 30 (kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,868] INFO [GroupCoordinator 1002]: Assignment received 
from leader for group kafka-spout for generation 30 
(kafka.coordinator.GroupCoordinator)

========= here storm starts consuming messages, then, when it hits the last 
message, I can see this log in kafka == >

[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Preparing to 
restabilize group kafka-spout with old generation 30 
(kafka.coordinator.GroupCoordinator)
[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Group kafka-spout with 
generation 31 is now empty (kafka.coordinator.GroupCoordinator)

=====
and then storm consumer group is stuck, no new messages are read from kafka. my 
topology/ spout are configured that way:

Topology:

      c.put(SConfig.TOPOLOGY_MAX_SPOUT_PENDING, 1000)
      c.put(SConfig.NIMBUS_SEEDS, "my nimbus seeds")
      c.put(SConfig.NIMBUS_THRIFT_PORT, 6627)
      c.put(SConfig.TOPOLOGY_WORKERS, 2)      
c.put(SConfig.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100)

Spout:

    props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, true)
    props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, ...)
    props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafka-spout")
    props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, keyDeserializer)
    props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, valueDeserializer)
 
 any hints?

  was:
I've created a topology that will read from kafka using storm-kafka-client but 
when it reaches the last message on kafka log it stops consuming and get stuck, 
new messages are never consumed, here are the kafka logs:

```
[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Preparing to 
restabilize group kafka-spout with old generation 29 
(kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Stabilized group 
kafka-spout generation 30 (kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,868] INFO [GroupCoordinator 1002]: Assignment received 
from leader for group kafka-spout for generation 30 
(kafka.coordinator.GroupCoordinator)
```

========= here storm starts consuming messages, then, when it hits the last 
message, I can see this log in kafka == >

[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Preparing to 
restabilize group kafka-spout with old generation 30 
(kafka.coordinator.GroupCoordinator)
[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Group kafka-spout with 
generation 31 is now empty (kafka.coordinator.GroupCoordinator)

=====
and then storm consumer group is stuck, no new messages are read from kafka. my 
topology/ spout are configured that way:

Topology:

      c.put(SConfig.TOPOLOGY_MAX_SPOUT_PENDING, 1000)
      c.put(SConfig.NIMBUS_SEEDS, "my nimbus seeds")
      c.put(SConfig.NIMBUS_THRIFT_PORT, 6627)
      c.put(SConfig.TOPOLOGY_WORKERS, 2)      
c.put(SConfig.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100)

Spout:

    props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, true)
    props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, ...)
    props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafka-spout")
    props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, keyDeserializer)
    props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, valueDeserializer)
 
 any hints?


> storm-kafka-client consumer group getting stuck consuming from kafka 0.10.1.1
> -----------------------------------------------------------------------------
>
>                 Key: STORM-2342
>                 URL: https://issues.apache.org/jira/browse/STORM-2342
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Felipe Cavalcanti
>
> I've created a topology that will read from kafka using storm-kafka-client 
> but when it reaches the last message on kafka log it stops consuming and get 
> stuck, new messages are never consumed, here are the kafka logs:
> [2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Preparing to 
> restabilize group kafka-spout with old generation 29 
> (kafka.coordinator.GroupCoordinator)
> [2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Stabilized group 
> kafka-spout generation 30 (kafka.coordinator.GroupCoordinator)
> [2017-02-03 19:15:26,868] INFO [GroupCoordinator 1002]: Assignment received 
> from leader for group kafka-spout for generation 30 
> (kafka.coordinator.GroupCoordinator)
> ========= here storm starts consuming messages, then, when it hits the last 
> message, I can see this log in kafka == >
> [2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Preparing to 
> restabilize group kafka-spout with old generation 30 
> (kafka.coordinator.GroupCoordinator)
> [2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Group kafka-spout 
> with generation 31 is now empty (kafka.coordinator.GroupCoordinator)
> =====
> and then storm consumer group is stuck, no new messages are read from kafka. 
> my topology/ spout are configured that way:
> Topology:
>       c.put(SConfig.TOPOLOGY_MAX_SPOUT_PENDING, 1000)
>       c.put(SConfig.NIMBUS_SEEDS, "my nimbus seeds")
>       c.put(SConfig.NIMBUS_THRIFT_PORT, 6627)
>       c.put(SConfig.TOPOLOGY_WORKERS, 2)      
> c.put(SConfig.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100)
> Spout:
>     props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, true)
>     props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, ...)
>     props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafka-spout")
>     props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, keyDeserializer)
>     props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, valueDeserializer)
>  
>  any hints?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to