[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-10-31 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670366#comment-16670366
 ] 

CHIENHSING WU commented on KAFKA-3932:
--

[~elevy], from the description what you are asking for is for the poll call to 
return messages from available partitions in a fair/balanced pattern. The 
version you tested was fairly old. Did you get a chance to test the current 
version and if so, is the problem still there? I used version 2.0 and had 
similar issues.

Also, could you take a look at the comment I put previously and see if the 
change I propose would address the issue? 

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Assignee: CHIENHSING WU
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-10-31 Thread CHIENHSING WU (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CHIENHSING WU reassigned KAFKA-3932:


Assignee: CHIENHSING WU

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Assignee: CHIENHSING WU
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-10-24 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662386#comment-16662386
 ] 

CHIENHSING WU commented on KAFKA-3932:
--

I am going ahead to implement the change I propose. I don't have the right to 
assign this ticket to myself. *This comment hopefully alert anyone that plans 
to work on it.*

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-2350) Add KafkaConsumer pause capability

2018-10-19 Thread CHIENHSING WU (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CHIENHSING WU updated KAFKA-2350:
-
Comment: was deleted

(was: [~hachikuji] and others, we plan to use this function in our project. 
From the Java Doc it says "Future calls to 
[{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
 *will not return any records* from these partitions until they have been 
resumed using 
[{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].;
 I think the poll call will still return records that are already received at 
the client side. Do you think this should be enforced against the received 
records as well?)

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2018-10-19 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656811#comment-16656811
 ] 

CHIENHSING WU commented on KAFKA-2350:
--

[~hachikuji] and others, we plan to use this function in our project. From the 
Java Doc it says "Future calls to 
[{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
 *will not return any records* from these partitions until they have been 
resumed using 
[{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].;
 I think the poll call will still return records that are already received at 
the client side. Do you think this should be enforced against the received 
records as well?

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-10-18 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655279#comment-16655279
 ] 

CHIENHSING WU commented on KAFKA-3932:
--

I encountered the same issue as well. Upon studying the source code and the 
[PIP|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records]|https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records].],
 I think the issues is the statement in PIP: "As before, we'd keep track of 
*which partition we left off* at so that the next iteration would *begin 
there*." I think it should *NOT* use the last partition in the next iteration; 
*it should pick the next one instead.* 

The simplest solution to impose the order to pick the next one is to use the 
order the consumer.internals.Fetcher receives the partition messages, as 
determined by *completedFetches* queue in that class. To avoid parsing the 
partition messages repeatedly. we can *save those parsed fetches to a list and 
maintain the next partition to get messages there.* 

Does it sound like a good approach? If this is not the right place to discuss 
the design please let me know where to engage. If this is agreeable I can 
contribute the implementation.

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)