[
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639307#comment-14639307
]
Jiangjie Qin commented on KAFKA-2350:
-------------------------------------
Ah, I see your concern. Originally I was thinking that we are still going to
make self partition control and consumer coordinator partition assignment
mutually exclusive. So if subscribe(partition) has been called,
subscribe(topic) will throw exception.
Now I think allowing mixed mode might be an interesting feature to support. Let
me take a shot to address the use cases you raised.
We maintain the following two data structure:
{code}
Map<Topic, TopicPartition> assignedPartitions - A
Set<Topic, TopicPartition> subscribedPartitions - B
{code}
The only rule we have is that you can not do both MANUAL and AUTO partition
subscription on the same topic.
The validity of sub/unsub to a topic are as below:
* In A, in B - partition is assigned by coordinator, sub/unsub works for both
topic and partition level
* In A, not in B - Topic is subscribed manually, sub/unsub at topic level will
get exception, sub/unsub at partition level works.
* Not in A, in B - Topic is assigned by coordinator, but suppressed by user,
sub/unsub to topic level works, sub/unsub to partition level only works when
partition is in B.
* Not in A, not in B - brand new topic, sub/unsub at topic and partition level
works.
We can discuss whether it is legitimate to unsubscribe from a partition it is
not assigned/subscribed to, but that is an orthogonal issue.
So for the cases you mentioned.
* case #1
{code}
subscribe(topic)
unsubscribe(partition)
{code}
means give me a set of partitions and suppress one of them - just like what you
said.
* case 2
{code}
subscribe(partition)
subscribe(topic)
{code}
means give me this partition, plus whatever coordinator gives me.
However, if the topic user is trying to subscribe to happened to be the same
topic of the partition it has already subscribed to in the first line, that is
an exception - you can not do manual and automatic partition subscription at
the same time. (In this case, the topic would be in subscribedPartitions but
not in assignedPartitions)
* case 3
{code}
unsubscribe(partition)
subscribe(topic)
{code}
is a little bit weird sequence at the first place. How could you suppress a
partition while you don't even know if the partition will be assigned to you?
Anyway, so here is the behavior. Unsubscribe to a partition will go through.
But after calling subscribe(topic), the assigned topic partition will not be
suppressed.
However, if user is currently subscribing to a partition manually from that
topic (in subscribedParititions, not in assignedPartitions)
* case 4
{code}
subscribe(partition)
subscribe(topic)
unsubscribe(partition)
{code}
The first two subscribe is described in case 2. The partition will be
suppressed after unsubscribe call.
* case 5
{code}
subscribe(partition)
unsubscribe(partition)
subscribe(topic)
{code}
If the first two lines are called for the same partition. it is equivalent to
just call subscribe(topic) - so the partition won't be suppressed. If the first
two lines are called for different partitions. It is then again a little bit
weird for use to suppress a partition that it does not even know whether it
will be assigned. But the partition won't be suppressed.
> Add KafkaConsumer pause capability
> ----------------------------------
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
> Issue Type: Improvement
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able
> to pause consumption of a topic. For example, when joining two topics, you
> may need to delay processing of one topic while you wait for the consumer of
> the other topic to catch up. The new consumer currently doesn't provide a
> nice way to do this. If you skip poll() or if you unsubscribe, then a
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any
> new fetches for that topic. After it is unpaused, fetches will begin again.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)