[ 
https://issues.apache.org/jira/browse/SPARK-12073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-12073:
---------------------------------
    Affects Version/s: 1.6.1
                       1.6.0

> Backpressure causes individual Kafka partitions to lag
> ------------------------------------------------------
>
>                 Key: SPARK-12073
>                 URL: https://issues.apache.org/jira/browse/SPARK-12073
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2, 1.6.0, 1.6.1
>            Reporter: Jason White
>            Assignee: Jason White
>             Fix For: 2.0.0
>
>
> We're seeing a growing lag on (2) individual Kafka partitions, on a topic 
> with 32 partitions. Our individual batch sessions are completing in 5-7s, 
> with a batch window of 30s, so there's plenty of room for Streaming to catch 
> up, but it looks to be intentionally limiting itself. These partitions are 
> experiencing unbalanced load (higher than most of the others)
> What I believe is happening is that maxMessagesPerPartition calculates an 
> appropriate limit for the message rate from all partitions, and then divides 
> by the number of partitions to determine how many messages to retrieve per 
> partition. The problem with this approach is that when one partition is 
> behind by millions of records (due to random Kafka issues) or is experiencing 
> heavy load, the number of messages to be retrieved shouldn't be evenly split 
> among the partitions. In this scenario, if the rate estimator calculates only 
> 100k total messages can be retrieved, each partition (out of say 32) only 
> retrieves max 100k/32=3125 messages.
> Under some conditions, this results in the backpressure keeping the lagging 
> partition from recovering. The PIDRateEstimator doesn't increase the number 
> of messages to retrieve enough to recover, and we stabilize at a point where 
> these individual partitions slowly grow.
> I have a PR on our fork in progress to allocate the maxMessagesPerPartition 
> by weighting the number to be retrieved on the current lag each partition is 
> currently experiencing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to