[ 
https://issues.apache.org/jira/browse/STORM-3399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936945#comment-16936945
 ] 

Stig Rohde Døssing commented on STORM-3399:
-------------------------------------------

Most likely you want to look at this method 
https://github.com/apache/storm/blob/38b09fd229d2034a0fb6d5359eaa0b8ecdc0b61a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L359

This is where we poll messages from Kafka. In order to support priority among 
partitions, you need to figure out a format for the user to set a priority in 
the KafkaSpoutConfig. This could e.g. be a map of partitions to percentages.

Then you most likely want to keep track of how many messages have been emitted 
per partition for some time period, see 
https://github.com/apache/storm/blob/38b09fd229d2034a0fb6d5359eaa0b8ecdc0b61a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L438.
 You may be able to build this kind of tracking into the OffsetManager class. 
You could for example track emitted tuple count per partition over a 10 minute 
interval (or some other user configurable interval maybe).

Once you know what the priority among partitions is, and how many messages have 
been emitted in the previous interval, you can update the method I linked in 
KafkaSpout to pause the partitions that have had "too many" emits recently. 
You'll likely want to ensure that we unpause partitions if a poll returned 
no/too few messages.

For example, if you pause partition 1 in a poll and get no messages back, 
partition 1 needs to be unpaused in the following poll. Similarly, if partition 
1 is paused and the poll returns less than max.poll.records messages (this is a 
Kafka config parameter), we should probably unpause partition 1. We want to 
allow the user to set a priority, but I don't think it's to anyone's benefit if 
we cause the spout to be inefficient in polling because of this mechanism. We 
should try to ensure that we get as many full batches polled as possible.

> Add ability to set priority among partitions in KafkaSpout
> ----------------------------------------------------------
>
>                 Key: STORM-3399
>                 URL: https://issues.apache.org/jira/browse/STORM-3399
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-kafka-client
>            Reporter: Stig Rohde Døssing
>            Assignee: Rishabh Jain
>            Priority: Minor
>              Labels: newbie
>
> This idea was raised for the old storm-kafka spout at 
> https://issues.apache.org/jira/browse/STORM-3160. As I'm closing that issue, 
> I'm raising this one to implement that functionality in the new spout.
> I believe doing this should be possible using KafkaConsumer.pause and 
> KafkaConsumer.resume.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to