[ 
https://issues.apache.org/jira/browse/SPARK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-36576.
----------------------------------
    Fix Version/s: 3.3.0
       Resolution: Fixed

Issue resolved by pull request 33827
[https://github.com/apache/spark/pull/33827]

> Improve range split calculation for Kafka Source minPartitions option
> ---------------------------------------------------------------------
>
>                 Key: SPARK-36576
>                 URL: https://issues.apache.org/jira/browse/SPARK-36576
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>            Reporter: Andrew Olson
>            Assignee: Andrew Olson
>            Priority: Minor
>             Fix For: 3.3.0
>
>
> While the 
> [documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]
>  does contain a clear disclaimer,
> {quote}Please note that this configuration is like a {{hint}}: the number of 
> Spark tasks will be *approximately* {{minPartitions}}. It can be less or more 
> depending on rounding errors or Kafka partitions that didn't receive any new 
> data.
> {quote}
> there are cases where the calculated Kafka partition range splits can differ 
> greatly from expectations. For evenly distributed data and most 
> {{minPartitions}} values this would not be a major or commonly encountered 
> concern. However when the distribution of data across partitions is very 
> heavily skewed, somewhat surprising range split calculations can result.
> For example, given the following input data:
>  * 1 partition containing 10,000 messages
>  * 1,000 partitions each containing 1 message
> Spark processing code loading from this collection of 1,001 partitions may 
> decide that it would like each task to read no more than 1,000 messages. 
> Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting 
> the single large partition to be split into 10 equal chunks, along with the 
> 1,000 small partitions each having their own task. That is far from what 
> actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up 
> splitting the large partition into 918 chunks of 10 or 11 messages, two 
> orders of magnitude from the desired maximum message count per task and 
> nearly double the number of Spark tasks hinted in the configuration.
> Proposing that the {{KafkaOffsetRangeCalculator}}'s range calculation logic 
> be modified to exclude small (i.e. un-split) partitions from the overall 
> proportional distribution math, in order to more reasonably divide the large 
> partitions when they are accompanied by many small partitions, and to provide 
> optimal behavior for cases where a {{minPartitions}} value is deliberately 
> computed based on the volume of data being read.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to