[
https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634855#comment-15634855
]
Bernard Leach commented on KAFKA-2284:
--------------------------------------
Just came across this as a failing test in
https://github.com/apache/kafka/pull/2101 where I had fixed the underlying bug.
I've separated the fix out to a separate PR
https://github.com/apache/kafka/pull/2102.
> ConsumerRebalanceListener receives wrong type in partitionOwnership values
> --------------------------------------------------------------------------
>
> Key: KAFKA-2284
> URL: https://issues.apache.org/jira/browse/KAFKA-2284
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.2.0
> Reporter: E. Sammer
> Assignee: Neha Narkhede
>
> The ConsumerRebalanceListener's beforeReleasingPartitions() method is
> supposed to receive an arg of Map<String, Set<Integer>> (topic ->
> Set(partitions)). Even though the type of the map value is specified as a
> java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed
> instead which does not implement Set<T> causing a class cast exception as
> soon as one attempts to access any value of the map. It looks as if this
> method was never tested against the actual types specified by the interface.
> Here's what happens if you call {{Set<T> foo =
> partitionOwnership.get(topic)}}:
> {code}
> 2015-06-18 07:28:43,776
> (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor)
> [WARN -
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)]
> Exception while rebalancing!
> java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper
> cannot be cast to java.util.Set
> at
> com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
> at
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)