[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639080#comment-14639080
 ] 

Jay Kreps edited comment on KAFKA-2350 at 7/23/15 4:21 PM:
-----------------------------------------------------------

[~becket_qin] I think there are three proposals:
1. Add pause/unpause
2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a 
topic but suppress a partition
3. Either of the above but making the use of group management explicit using an 
enable.group.management flag

I'm in favor of (2) if it is possible to work out the corner cases and the 
implementation doesn't cause us to go crazy. I think you are saying that having 
unsubscribe mean "suppress" is actually somewhat sensible. I'm in favor of (1) 
otherwise. I'm not in favor of (3) because the two different modes.

Let me point out a few of the weird bits of (2), though. Since we now allow 
mingling of subscribe(topic) and subscribe(partition) we have to work out all 
the combinations. The case where you do
{code}
  subscribe(topic)
  unsubscribe(partition)
{code}
is really clear it means give me a set of partitions but suppress this 
particular partition. Likewise I think 
{code}
  subscribe(partition)
  subscribe(topic)
{code}
also makes sense. You are saying give me this particular partition plus 
whatever else the coordinator assigns me. But what about
{code}
  unsubscribe(partition)
  subscribe(topic)
{code}
do you still get the same suppression effect? But now this is a bit weird:
{code}
  subscribe(partition)
  subscribe(topic)
  unsubscribe(partition)
{code}
Does the unsubscribe call suppress the partition or not? The first two calls 
normally mean subscribe me to whatever the co-ordinator gives me plus a given 
partition. The last two calls normally mean subscribe me to whatever the 
co-ordinator gives me except this given partition. But is the result of 
combining these two the same as
{code}
   subscribe(partition)
   unsubscribe(partition)
   subscribe(topic)
{code}
In other words I think all this implies that there are now three states for a 
partition: SUBSCRIBED, NOT_SUBSCRIBED, SUPPRESSED? This is what I think we'd 
have to work out to make your proposal feasible.



was (Author: jkreps):
[~becket_qin] I think there are three proposals:
1. Add pause/unpause
2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a 
topic but suppress a partition
3. Either of the above but making the use of group management explicit using an 
enable.group.management flag

I'm in favor of (2) if it is possible to work out the corner cases and the 
implementation doesn't cause us to go crazy. I think you are saying that having 
unsubscribe mean "suppress" is actually somewhat sensible. I'm in favor of (1) 
otherwise. I'm not in favor of (3) because the two different modes.

Let me point out a few of the weird bits of (2), though. Since we now allow 
mingling of subscribe(topic) and subscribe(partition) we have to work out all 
the combinations. The case where you do
  subscribe(topic)
  unsubscribe(partition)
is really clear it means give me a set of partitions but suppress this 
particular partition. Likewise I think 
  subscribe(partition)
  subscribe(topic)
also makes sense. You are saying give me this particular partition plus 
whatever else the coordinator assigns me. But what about
  unsubscribe(partition)
  subscribe(topic)
do you still get the same suppression effect? But now this is a bit weird:
  subscribe(partition)
  subscribe(topic)
  unsubscribe(partition)
Does the unsubscribe call suppress the partition or not? The first two calls 
normally mean subscribe me to whatever the co-ordinator gives me plus a given 
partition. The last two calls normally mean subscribe me to whatever the 
co-ordinator gives me except this given partition. But is the result of 
combining these two the same as 
   subscribe(partition)
   unsubscribe(partition)
   subscribe(topic)
In other words I think all this implies that there are now three states for a 
partition: SUBSCRIBED, NOT_SUBSCRIBED, SUPPRESSED? This is what I think we'd 
have to work out to make your proposal feasible.


> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to