[ 
https://issues.apache.org/jira/browse/KAFKA-2986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jens Rantil updated KAFKA-2986:
-------------------------------
    Description: 
I sent a related post to the Kafka mailing list, but haven't received any 
response: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAL%2BArfWNfkpymkNDuf6UJ06CJJ63XC1bPHeT4TSYXKjSsOpu-Q%40mail.gmail.com%3E
 So far, I think this is a design issue in Kafka so I'm taking the liberty of 
creating an issue.

*Use case:*
 - Slow consumtion. Maybe around 20 seconds per record.
 - Large variation in message size: Serialized tasks are in the range of ~300 
bytes up to ~3 MB.
 - Consumtion latency (20 seconds) is independent of message size.

*Code example:*
{noformat}
while (isRunning()) {
  ConsumerRecords<String, byte[]> records = consumer.poll(100);
  for (final ConsumerRecord<String, byte[]> record : records) {
    // Handle record...
  }
}
{noformat}

*Problem:* Kafka doesn't have any issues with large messages (as long as you 
bump some configuration flags). However, the problem is two-fold:
- KafkaConsumer#poll is the only call that sends healthchecks.
- There is no limit as to how many messages KafkaConsumer#poll will return. The 
limit is only set to the total number of bytes to be prefetched. This is 
problematic for varying message sizes as the session timeout becomes extremelly 
hard to tune:
-- delay until next KafkaConsumer#poll call is proportional to the number of 
records returned by previous KafkaConsumer#poll call.
-- KafkaConsumer#poll will return many small records or just a few larger 
records. For many small messages the risk is very large of the session timeout 
to kick in. Raising the session timeout in the order of magnitudes required to 
handle the smaller messages increases the latency until a dead consumer is 
discovered a thousand fold.

*Proposed fixes:* I do not claim to be a Kafka expert, but two ideas are to 
either
 - allow add `KafkaConsumer#healthy` call to let the broker know we are still 
processing records; or
 - add an upper number of message limit to `KafkaConsumer#poll`. I am thinking 
of something like `KafkaConsumer#poll(timeout, nMaxMessages)`. This could 
obviously be set a configuration property instead. To avoid the broker having 
to look at the messages it sends, I suggest the KafkaConsumer decides how many 
messages it returns from poll.

*Workarounds:*
 - Have different topics for different message sizes. Makes tuning of partition 
prefetch easier.
 - Use another tool :)

*Questions:* Should Kafka be able to handle this case? Maybe I am using the 
wrong tool for this and Kafka is simply designed for high-throughput/low 
latency?

  was:
I sent a related post to the Kafka mailing list, but haven't received any 
response: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAL%2BArfWNfkpymkNDuf6UJ06CJJ63XC1bPHeT4TSYXKjSsOpu-Q%40mail.gmail.com%3E
 So far, I think this is a design issue in Kafka so I'm taking the liberty of 
creating an issue.

*Use case:*
 - Slow consumtion. Maybe around 20 seconds per record.
 - Large variation in message size: Serialized tasks are in the range of ~300 
bytes up to ~3 MB.
 - Consumtion latency (20 seconds) is independent of message size.

*Code example:*
{noformat}
while (isRunning()) {
  ConsumerRecords<String, byte[]> records = consumer.poll(100);
  for (final ConsumerRecord<String, byte[]> record : records) {
    // Handle record...
  }
}
{noformat}

*Problem:* Kafka doesn't have any issues with large messages (as long as you 
bump some configuration flags). However, the problem is two-fold:
- KafkaConsumer#poll is the only call that sends healthchecks.
- There is no limit as to how many messages KafkaConsumer#poll will return. The 
limit is only set to the total number of bytes to be prefetched. This is 
problematic for varying message sizes as the session timeout becomes extremelly 
hard to tune:
-- delay until next KafkaConsumer#poll call is proportional to the number of 
records returned by previous KafkaConsumer#poll call.
-- KafkaConsumer#poll will return many small records or just a few larger 
records. For many small messages the risk is very large of the session timeout 
to kick in. Raising the session timeout in the order of magnitudes required to 
handle the smaller messages increases the latency until a dead consumer is 
discovered a thousand fold.

*Proposed fixes:* I do not claim to be a Kafka expert, but two ideas are to 
either
 - allow add `KafkaConsumer#healthy` call to let the broker know we are still 
processing records; or
 - add an upper number of message limit to `KafkaConsumer#poll`. I am thinking 
of something like `KafkaConsumer#poll(timeout, nMaxMessages)`.

*Workarounds:*
 - Have different topics for different message sizes. Makes tuning of partition 
prefetch easier.
 - Use another tool :)

*Questions:* Should Kafka be able to handle this case? Maybe I am using the 
wrong tool for this and Kafka is simply designed for high-throughput/low 
latency?


> Consumer group doesn't lend itself well for slow consumers with varying 
> message size
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2986
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>         Environment: Java consumer API 0.9.0.0
>            Reporter: Jens Rantil
>            Assignee: Neha Narkhede
>
> I sent a related post to the Kafka mailing list, but haven't received any 
> response: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAL%2BArfWNfkpymkNDuf6UJ06CJJ63XC1bPHeT4TSYXKjSsOpu-Q%40mail.gmail.com%3E
>  So far, I think this is a design issue in Kafka so I'm taking the liberty of 
> creating an issue.
> *Use case:*
>  - Slow consumtion. Maybe around 20 seconds per record.
>  - Large variation in message size: Serialized tasks are in the range of ~300 
> bytes up to ~3 MB.
>  - Consumtion latency (20 seconds) is independent of message size.
> *Code example:*
> {noformat}
> while (isRunning()) {
>   ConsumerRecords<String, byte[]> records = consumer.poll(100);
>   for (final ConsumerRecord<String, byte[]> record : records) {
>     // Handle record...
>   }
> }
> {noformat}
> *Problem:* Kafka doesn't have any issues with large messages (as long as you 
> bump some configuration flags). However, the problem is two-fold:
> - KafkaConsumer#poll is the only call that sends healthchecks.
> - There is no limit as to how many messages KafkaConsumer#poll will return. 
> The limit is only set to the total number of bytes to be prefetched. This is 
> problematic for varying message sizes as the session timeout becomes 
> extremelly hard to tune:
> -- delay until next KafkaConsumer#poll call is proportional to the number of 
> records returned by previous KafkaConsumer#poll call.
> -- KafkaConsumer#poll will return many small records or just a few larger 
> records. For many small messages the risk is very large of the session 
> timeout to kick in. Raising the session timeout in the order of magnitudes 
> required to handle the smaller messages increases the latency until a dead 
> consumer is discovered a thousand fold.
> *Proposed fixes:* I do not claim to be a Kafka expert, but two ideas are to 
> either
>  - allow add `KafkaConsumer#healthy` call to let the broker know we are still 
> processing records; or
>  - add an upper number of message limit to `KafkaConsumer#poll`. I am 
> thinking of something like `KafkaConsumer#poll(timeout, nMaxMessages)`. This 
> could obviously be set a configuration property instead. To avoid the broker 
> having to look at the messages it sends, I suggest the KafkaConsumer decides 
> how many messages it returns from poll.
> *Workarounds:*
>  - Have different topics for different message sizes. Makes tuning of 
> partition prefetch easier.
>  - Use another tool :)
> *Questions:* Should Kafka be able to handle this case? Maybe I am using the 
> wrong tool for this and Kafka is simply designed for high-throughput/low 
> latency?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to