[ 
https://issues.apache.org/jira/browse/SPARK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated SPARK-36576:
---------------------------------
    Description: 
While the 
[documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]
 does contain a clear disclaimer,
{quote}Please note that this configuration is like a {{hint}}: the number of 
Spark tasks will be *approximately* {{minPartitions}}. It can be less or more 
depending on rounding errors or Kafka partitions that didn't receive any new 
data.
{quote}
there are cases where the calculated Kafka partition range splits can differ 
greatly from expectations. For evenly distributed data and most 
{{minPartitions}} values this would not be a major or commonly encountered 
concern. However when the distribution of data across partitions is very 
heavily skewed, somewhat surprising range split calculations can result.

For example, given the following input data:
 * 1 partition containing 10,000 messages
 * 1,000 partitions each containing 1 message

Spark processing code loading from this collection of 1,001 partitions may 
decide that it would like each task to read no more than 1,000 messages. 
Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting 
the single large partition to be split into 10 equal chunks, along with the 
1,000 small partitions each having their own task. That is far from what 
actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up splitting 
the large partition into 918 chunks of 10 or 11 messages, two orders of 
magnitude from the desired maximum message count per task and nearly double the 
number of Spark tasks hinted in the configuration.

Proposing that the {{KafkaOffsetRangeCalculator}}'s range calculation logic be 
modified to exclude small (i.e. un-split) partitions from the overall 
proportional distribution math, in order to more reasonably divide the large 
partitions when they are accompanied by many small partitions, and to provide 
optimal behavior for cases where a {{minPartitions}} value is deliberately 
computed based on the volume of data being read.

  was:
While the 
[documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]
 does contain a clear disclaimer,
{quote}Please note that this configuration is like a {{hint}}: the number of 
Spark tasks will be *approximately* {{minPartitions}}. It can be less or more 
depending on rounding errors or Kafka partitions that didn't receive any new 
data.
{quote}
there are cases where the calculated Kafka partition range splits can differ 
greatly from expectations. For evenly distributed data and most 
{{minPartitions}} values this would not be a major or commonly encountered 
concern. However when the distribution of data across partitions is very 
heavily skewed, somewhat surprising range split calculations can result.

For example, given the following input data:
 * 1 partition containing 10,000 messages
 * 1,000 partitions each containing 1 message

Spark processing code loading from this collection of 1,001 partitions may 
decide that it would like each task to read no more than 1,000 messages. 
Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting 
the single large partition to be split into 10 equal chunks, along with the 
1,000 small partitions each having their own task. That is far from what 
actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up splitting 
the large partition into 918 chunks of 10 or 11 messages, two orders of 
magnitude from the desired maximum message count per task and nearly double the 
number of Spark tasks hinted in the configuration.

Proposing that range the calculation logic be modified to exclude small (i.e. 
un-split) partitions from the overall proportional distribution math, in order 
to more reasonably divide the large partitions when they are accompanied by 
many small partitions, and to provide optimal behavior for cases where a 
{{minPartitions}} value is deliberately computed based on the volume of data 
being read.


> Improve range split calculation for Kafka Source minPartitions option
> ---------------------------------------------------------------------
>
>                 Key: SPARK-36576
>                 URL: https://issues.apache.org/jira/browse/SPARK-36576
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>            Reporter: Andrew Olson
>            Priority: Minor
>
> While the 
> [documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]
>  does contain a clear disclaimer,
> {quote}Please note that this configuration is like a {{hint}}: the number of 
> Spark tasks will be *approximately* {{minPartitions}}. It can be less or more 
> depending on rounding errors or Kafka partitions that didn't receive any new 
> data.
> {quote}
> there are cases where the calculated Kafka partition range splits can differ 
> greatly from expectations. For evenly distributed data and most 
> {{minPartitions}} values this would not be a major or commonly encountered 
> concern. However when the distribution of data across partitions is very 
> heavily skewed, somewhat surprising range split calculations can result.
> For example, given the following input data:
>  * 1 partition containing 10,000 messages
>  * 1,000 partitions each containing 1 message
> Spark processing code loading from this collection of 1,001 partitions may 
> decide that it would like each task to read no more than 1,000 messages. 
> Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting 
> the single large partition to be split into 10 equal chunks, along with the 
> 1,000 small partitions each having their own task. That is far from what 
> actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up 
> splitting the large partition into 918 chunks of 10 or 11 messages, two 
> orders of magnitude from the desired maximum message count per task and 
> nearly double the number of Spark tasks hinted in the configuration.
> Proposing that the {{KafkaOffsetRangeCalculator}}'s range calculation logic 
> be modified to exclude small (i.e. un-split) partitions from the overall 
> proportional distribution math, in order to more reasonably divide the large 
> partitions when they are accompanied by many small partitions, and to provide 
> optimal behavior for cases where a {{minPartitions}} value is deliberately 
> computed based on the volume of data being read.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to