[jira] [Created] (STORM-2850) ManualPartitionSubscription assigns new partitions before calling onPartitionsRevoked

2017-12-08 Thread JIRA
Stig Rohde Døssing created STORM-2850:
-

 Summary: ManualPartitionSubscription assigns new partitions before 
calling onPartitionsRevoked
 Key: STORM-2850
 URL: https://issues.apache.org/jira/browse/STORM-2850
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka-client
Affects Versions: 2.0.0, 1.2.0
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


ManualPartitionSubscription does partition assignment updates in the wrong 
order. It calls KafkaConsumer.assign, then onPartitionsRevoked and last 
onPartitionsAssigned. The order should be onPartitionsRevoked, then assign, 
then onPartitionsAssigned.

onPartitionsRevoked has to be called before we reassign partitions, because we 
try to commit offsets for the revoked partitions. If we try to commit to a 
partition the consumer is not assigned, it will throw an exception. The 
onRevoke, assign, onAssign order is also more in line with the javadoc for 
ConsumerRebalanceListener, which specifies that onRevoke should be called 
before the partition rebalance begins.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (STORM-2850) ManualPartitionSubscription assigns new partitions before calling onPartitionsRevoked

2017-12-08 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated STORM-2850:
--
Labels: pull-request-available  (was: )

> ManualPartitionSubscription assigns new partitions before calling 
> onPartitionsRevoked
> -
>
> Key: STORM-2850
> URL: https://issues.apache.org/jira/browse/STORM-2850
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>  Labels: pull-request-available
>
> ManualPartitionSubscription does partition assignment updates in the wrong 
> order. It calls KafkaConsumer.assign, then onPartitionsRevoked and last 
> onPartitionsAssigned. The order should be onPartitionsRevoked, then assign, 
> then onPartitionsAssigned.
> onPartitionsRevoked has to be called before we reassign partitions, because 
> we try to commit offsets for the revoked partitions. If we try to commit to a 
> partition the consumer is not assigned, it will throw an exception. The 
> onRevoke, assign, onAssign order is also more in line with the javadoc for 
> ConsumerRebalanceListener, which specifies that onRevoke should be called 
> before the partition rebalance begins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread Evan Rosebrook (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283832#comment-16283832
 ] 

Evan Rosebrook commented on STORM-2847:
---

[~Srdo] Here is the requested info:

kafka-clients: 0.10.2.0
kafka-broker: 0.10.1.0

val retryService = new KafkaSpoutRetryExponentialBackoff(
  KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
  KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
  10,
  KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10))

val recordTranslator = new CustomRecordTranslator(topics, 
SerializableFactories.getDeserializer)

new KafkaSpoutConfig.Builder(hosts, classOf[CustomDeserializer], 
classOf[CustomDeserializer], topics)
  .setGroupId(consumerGroup)
  .setRetry(retryService)
  .setRecordTranslator(recordTranslator)
  .setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1)
  .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
  .setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1048576)
  .setOffsetCommitPeriodMs(500)
  .build()

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284070#comment-16284070
 ] 

Stig Rohde Døssing commented on STORM-2847:
---

Thanks [~erosebrook]. Your configuration and versions look fine to me.

I've spent some time going over the code, and I'm having a hard time spotting 
the problem.

The exception you're getting from 
https://github.com/apache/storm/blob/e2e3f5d19a8671e3759a04b94135fd6643b3aa61/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L475
 is saying that the TopicPartition parameter to KafkaConsumer.position was not 
assigned to the consumer. I don't understand how that can happen, because the 
TopicPartition parameter is based on the offsetManagers map, and the key set of 
that map is set to be exactly the partitions that are assigned to the consumer, 
at 
https://github.com/apache/storm/blob/e2e3f5d19a8671e3759a04b94135fd6643b3aa61/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L174
 and 
https://github.com/apache/storm/blob/e2e3f5d19a8671e3759a04b94135fd6643b3aa61/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L197.

Could you try to enable debug logging for the org.apache.storm.kafka.spout log 
domain and provoking the error again? I particularly want to see if the 
partitions logged at 
https://github.com/apache/storm/blob/e2e3f5d19a8671e3759a04b94135fd6643b3aa61/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L164
 match the ones logged at 
https://github.com/apache/storm/blob/e2e3f5d19a8671e3759a04b94135fd6643b3aa61/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L469.

I'm also curious if the error happens at random, or does it happen at 
particular moments?

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread Evan Rosebrook (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284249#comment-16284249
 ] 

Evan Rosebrook commented on STORM-2847:
---

[~Srdo]

It happens at every manually triggered rebalance. It seemed to me like it has 
to do with partitions being revoked from kafka itself, but the spout still has 
a reference to them. I'll get you the debug data when I am able.

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284263#comment-16284263
 ] 

Stig Rohde Døssing commented on STORM-2847:
---

When you refer to triggering a rebalance what do you mean? Asking Storm to 
rebalance the topology or something else?

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread Evan Rosebrook (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284266#comment-16284266
 ] 

Evan Rosebrook commented on STORM-2847:
---

yes, asking storm to rebalance

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284263#comment-16284263
 ] 

Stig Rohde Døssing edited comment on STORM-2847 at 12/8/17 9:41 PM:


When you refer to triggering a rebalance what do you mean? Asking Storm to 
rebalance the topology or something else?

Also as of 1.2.0 Kafka isn't responsible for managing consumer group partition 
assignment anymore, the spout handles distributing partitions without 
consulting Kafka. See https://issues.apache.org/jira/browse/STORM-2542 for why 
we made this change. Partition assignment is now handled by 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
 and associated classes.


was (Author: srdo):
When you refer to triggering a rebalance what do you mean? Asking Storm to 
rebalance the topology or something else?

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284286#comment-16284286
 ] 

Stig Rohde Døssing edited comment on STORM-2847 at 12/8/17 9:50 PM:


I think I have a guess as to what could be wrong. If the spout is deactivated 
and then reactivated, we replace the KafkaConsumer with a new one. 
Unfortunately 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java#L58
 will ensure that we don't assign partitions to the new consumer unless the 
assignment changes, because we reuse the old ManualPartitionSubscription 
instance.

I'll look at fixing this in the next few days. Hopefully it's what's causing 
this.


was (Author: srdo):
I think I have a guess as to what could be wrong. If the spout is deactivated 
and then reactivated, we replace the KafkaConsumer with a new one. 
Unfortunately 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java#L58
 will ensure that we don't assign partitions to the new consumer unless the 
assignment changes.

I'll look at fixing this in the next few days. Hopefully it's what's causing 
this.

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (STORM-2847) Exception thrown after rebalance IllegalArgumentException

2017-12-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284286#comment-16284286
 ] 

Stig Rohde Døssing commented on STORM-2847:
---

I think I have a guess as to what could be wrong. If the spout is deactivated 
and then reactivated, we replace the KafkaConsumer with a new one. 
Unfortunately 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java#L58
 will ensure that we don't assign partitions to the new consumer unless the 
assignment changes.

I'll look at fixing this in the next few days. Hopefully it's what's causing 
this.

> Exception thrown after rebalance IllegalArgumentException
> -
>
> Key: STORM-2847
> URL: https://issues.apache.org/jira/browse/STORM-2847
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.0
>Reporter: Evan Rosebrook
>Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)