[ 
https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16185102#comment-16185102
 ] 

Nick West commented on KAFKA-2359:
----------------------------------

Work around, unsafe and brittle, but it works.

{code}
// Use reflection to get at the inner workings to ensure partitions have been 
created
Field subscriptionsField = 
consumer.getClass().getDeclaredField("subscriptions");
subscriptionsField.setAccessible(true);
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);

SubscriptionState subscriptions = 
(SubscriptionState)subscriptionsField.get(consumer);
ConsumerCoordinator coordinator = 
(ConsumerCoordinator)coordinatorField.get(consumer);

if (subscriptions.partitionsAutoAssigned())
    coordinator.ensurePartitionAssignment();
{code}

> New consumer - partitions auto assigned only on poll
> ----------------------------------------------------
>
>                 Key: KAFKA-2359
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2359
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0
>            Reporter: Stevo Slavic
>            Priority: Minor
>
> In the new consumer I encountered unexpected behavior. After constructing 
> {{KafkaConsumer}} instance with configured consumer rebalance callback 
> handler, and subscribing to a topic with "consumer.subscribe(topic)", 
> retrieving subscriptions would return empty set and callback handler would 
> not get called (no partitions ever assigned or revoked), no matter how long 
> instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment 
> will only be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
>     coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} 
> fragment in {{KafkaConsumer.subscriptions}} accessor as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to