[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639665#comment-14639665
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

Hmm, good point. When rebalance come into place, things can be a little bit 
tricky. That said, this could also be tricky with pause/unpause.

It might be useful to track the contents in the two topic partition maps as 
well.
[1] subscribedPartitions
[2] assignedPartitions

The tricky part here is whether a rebalance should not override the suppression 
or not. During rebalance, consumer needs to compare the newly assigned 
partitions with [2] before rebalance to see if the partition is already in [2] 
or not.
* When a partition remains assigned to a consumer (partition is in [2] before 
rebalance), the rebalance should keep the current state of that partition 
unchanged.
** If the partition is in [2] but not in [1] (under suppression), the rebalance 
should keep it as is - the partition should not be add to [1] since that will 
override the suppression.
** If the partition is in both [1] and [2], the rebalance should also keep it 
as is.
* When a partition is not in [2] before rebalance, add the partition to both 
[1] and [2].
* When a partition has been assigned to some other consumer, the rebalance will 
remove that partition from both [1] and [2]. The later on operation will 
receive exception.

Write that in code would be something like:
{code}
for (TopicPartition tp : assignedPartitions) {
    if (!newlyAssignedPartitions.contains(tp)) {
        subscribedPartitions.remove(tp)
        assignedPartitions.remove(tp)
    }
}
for (TopicPartition tp : newlyAssignedPartitions) {
    if(!assignedPartitions.contains(tp)) {
        subscribedPartitions.add(tp)
        assignedPartitions.add(tp)
    }
}
{code}

Assuming that, here would be the behavior:

* rebalance occured between A and B. (partition is in both [1] and [2])
** After rebalance, if the partition has been assigned to some other consumer 
(partition is removed from both [1] and [2]), unsubscribe in B will either have 
no effect or get an exception. In that case, user might need to catch the 
exception and call assignedPartitions() again to see which partition they want 
to unsubscribe.
** After rebalance, if the partition is still assigned to this consumer 
(partition remains in both [1] and [2]), unsubscribe in B will work (partition 
will be removed from [1] but still in [2]).

* rebalance occured between B and C. (partition is not in [1] but in [2])
** After the rebalance if the partition has been assigned to some other 
consumer (partition is neither in [1] or [2]), subscribe in C will receive 
exception because the partition is not assigned to this consumer.
** After the rebalance if the partition remains assigned to this consumer, this 
rely on the trick mentioned above. Rebalance will not override the suppression 
in B. So C will subscribe to the partition successfully (partition will be in 
[1] and [2])

* rebalance occured after C
** After the rebalance if the partition has been assigned to some other 
consumer, partition will be removed from both [1] or [2]. Consumer will not be 
consuming from the partition anymore.
** After the rebalance if the partition remains assigned to this consumer, 
partition will stay in [1] and [2]. Consumer will keep consuming from the 
partition.



> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to