[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion
[ https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670366#comment-16670366 ] CHIENHSING WU commented on KAFKA-3932: -- [~elevy], from the description what you are asking for is for the poll call to return messages from available partitions in a fair/balanced pattern. The version you tested was fairly old. Did you get a chance to test the current version and if so, is the problem still there? I used version 2.0 and had similar issues. Also, could you take a look at the comment I put previously and see if the change I propose would address the issue? > Consumer fails to consume in a round robin fashion > -- > > Key: KAFKA-3932 > URL: https://issues.apache.org/jira/browse/KAFKA-3932 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.0 >Reporter: Elias Levy >Assignee: CHIENHSING WU >Priority: Major > > The Java consumer fails consume messages in a round robin fashion. This can > lead to an unbalance consumption. > In our use case we have a set of consumer that can take a significant amount > of time consuming messages off a topic. For this reason, we are using the > pause/poll/resume pattern to ensure the consumer session is not timeout. The > topic that is being consumed has been preloaded with message. That means > there is a significant message lag when the consumer is first started. To > limit how many messages are consumed at a time, the consumer has been > configured with max.poll.records=1. > The first initial observation is that the client receive a large batch of > messages for the first partition it decides to consume from and will consume > all those messages before moving on, rather than returning a message from a > different partition for each call to poll. > We solved this issue by configuring max.partition.fetch.bytes to be small > enough that only a single message will be returned by the broker on each > fetch, although this would not be feasible if message size were highly > variable. > The behavior of the consumer after this change is to largely consume from a > small number of partitions, usually just two, iterating between them, until > it exhausts them, before moving to another partition. This behavior is > problematic if the messages have some rough time semantics and need to be > process roughly time ordered across all partitions. > It would be useful if the consumer has a pluggable API that allowed custom > logic to select which partition to consume from next, thus enabling the > creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-3932) Consumer fails to consume in a round robin fashion
[ https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CHIENHSING WU reassigned KAFKA-3932: Assignee: CHIENHSING WU > Consumer fails to consume in a round robin fashion > -- > > Key: KAFKA-3932 > URL: https://issues.apache.org/jira/browse/KAFKA-3932 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.0 >Reporter: Elias Levy >Assignee: CHIENHSING WU >Priority: Major > > The Java consumer fails consume messages in a round robin fashion. This can > lead to an unbalance consumption. > In our use case we have a set of consumer that can take a significant amount > of time consuming messages off a topic. For this reason, we are using the > pause/poll/resume pattern to ensure the consumer session is not timeout. The > topic that is being consumed has been preloaded with message. That means > there is a significant message lag when the consumer is first started. To > limit how many messages are consumed at a time, the consumer has been > configured with max.poll.records=1. > The first initial observation is that the client receive a large batch of > messages for the first partition it decides to consume from and will consume > all those messages before moving on, rather than returning a message from a > different partition for each call to poll. > We solved this issue by configuring max.partition.fetch.bytes to be small > enough that only a single message will be returned by the broker on each > fetch, although this would not be feasible if message size were highly > variable. > The behavior of the consumer after this change is to largely consume from a > small number of partitions, usually just two, iterating between them, until > it exhausts them, before moving to another partition. This behavior is > problematic if the messages have some rough time semantics and need to be > process roughly time ordered across all partitions. > It would be useful if the consumer has a pluggable API that allowed custom > logic to select which partition to consume from next, thus enabling the > creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion
[ https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662386#comment-16662386 ] CHIENHSING WU commented on KAFKA-3932: -- I am going ahead to implement the change I propose. I don't have the right to assign this ticket to myself. *This comment hopefully alert anyone that plans to work on it.* > Consumer fails to consume in a round robin fashion > -- > > Key: KAFKA-3932 > URL: https://issues.apache.org/jira/browse/KAFKA-3932 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.0 >Reporter: Elias Levy >Priority: Major > > The Java consumer fails consume messages in a round robin fashion. This can > lead to an unbalance consumption. > In our use case we have a set of consumer that can take a significant amount > of time consuming messages off a topic. For this reason, we are using the > pause/poll/resume pattern to ensure the consumer session is not timeout. The > topic that is being consumed has been preloaded with message. That means > there is a significant message lag when the consumer is first started. To > limit how many messages are consumed at a time, the consumer has been > configured with max.poll.records=1. > The first initial observation is that the client receive a large batch of > messages for the first partition it decides to consume from and will consume > all those messages before moving on, rather than returning a message from a > different partition for each call to poll. > We solved this issue by configuring max.partition.fetch.bytes to be small > enough that only a single message will be returned by the broker on each > fetch, although this would not be feasible if message size were highly > variable. > The behavior of the consumer after this change is to largely consume from a > small number of partitions, usually just two, iterating between them, until > it exhausts them, before moving to another partition. This behavior is > problematic if the messages have some rough time semantics and need to be > process roughly time ordered across all partitions. > It would be useful if the consumer has a pluggable API that allowed custom > logic to select which partition to consume from next, thus enabling the > creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CHIENHSING WU updated KAFKA-2350: - Comment: was deleted (was: [~hachikuji] and others, we plan to use this function in our project. From the Java Doc it says "Future calls to [{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-] *will not return any records* from these partitions until they have been resumed using [{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].; I think the poll call will still return records that are already received at the client side. Do you think this should be enforced against the received records as well?) > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656811#comment-16656811 ] CHIENHSING WU commented on KAFKA-2350: -- [~hachikuji] and others, we plan to use this function in our project. From the Java Doc it says "Future calls to [{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-] *will not return any records* from these partitions until they have been resumed using [{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].; I think the poll call will still return records that are already received at the client side. Do you think this should be enforced against the received records as well? > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion
[ https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655279#comment-16655279 ] CHIENHSING WU commented on KAFKA-3932: -- I encountered the same issue as well. Upon studying the source code and the [PIP|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records]|https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records].], I think the issues is the statement in PIP: "As before, we'd keep track of *which partition we left off* at so that the next iteration would *begin there*." I think it should *NOT* use the last partition in the next iteration; *it should pick the next one instead.* The simplest solution to impose the order to pick the next one is to use the order the consumer.internals.Fetcher receives the partition messages, as determined by *completedFetches* queue in that class. To avoid parsing the partition messages repeatedly. we can *save those parsed fetches to a list and maintain the next partition to get messages there.* Does it sound like a good approach? If this is not the right place to discuss the design please let me know where to engage. If this is agreeable I can contribute the implementation. > Consumer fails to consume in a round robin fashion > -- > > Key: KAFKA-3932 > URL: https://issues.apache.org/jira/browse/KAFKA-3932 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.0 >Reporter: Elias Levy >Priority: Major > > The Java consumer fails consume messages in a round robin fashion. This can > lead to an unbalance consumption. > In our use case we have a set of consumer that can take a significant amount > of time consuming messages off a topic. For this reason, we are using the > pause/poll/resume pattern to ensure the consumer session is not timeout. The > topic that is being consumed has been preloaded with message. That means > there is a significant message lag when the consumer is first started. To > limit how many messages are consumed at a time, the consumer has been > configured with max.poll.records=1. > The first initial observation is that the client receive a large batch of > messages for the first partition it decides to consume from and will consume > all those messages before moving on, rather than returning a message from a > different partition for each call to poll. > We solved this issue by configuring max.partition.fetch.bytes to be small > enough that only a single message will be returned by the broker on each > fetch, although this would not be feasible if message size were highly > variable. > The behavior of the consumer after this change is to largely consume from a > small number of partitions, usually just two, iterating between them, until > it exhausts them, before moving to another partition. This behavior is > problematic if the messages have some rough time semantics and need to be > process roughly time ordered across all partitions. > It would be useful if the consumer has a pluggable API that allowed custom > logic to select which partition to consume from next, thus enabling the > creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)