[ https://issues.apache.org/jira/browse/SPARK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-36576. ---------------------------------- Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 33827 [https://github.com/apache/spark/pull/33827] > Improve range split calculation for Kafka Source minPartitions option > --------------------------------------------------------------------- > > Key: SPARK-36576 > URL: https://issues.apache.org/jira/browse/SPARK-36576 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.1.2 > Reporter: Andrew Olson > Assignee: Andrew Olson > Priority: Minor > Fix For: 3.3.0 > > > While the > [documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html] > does contain a clear disclaimer, > {quote}Please note that this configuration is like a {{hint}}: the number of > Spark tasks will be *approximately* {{minPartitions}}. It can be less or more > depending on rounding errors or Kafka partitions that didn't receive any new > data. > {quote} > there are cases where the calculated Kafka partition range splits can differ > greatly from expectations. For evenly distributed data and most > {{minPartitions}} values this would not be a major or commonly encountered > concern. However when the distribution of data across partitions is very > heavily skewed, somewhat surprising range split calculations can result. > For example, given the following input data: > * 1 partition containing 10,000 messages > * 1,000 partitions each containing 1 message > Spark processing code loading from this collection of 1,001 partitions may > decide that it would like each task to read no more than 1,000 messages. > Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting > the single large partition to be split into 10 equal chunks, along with the > 1,000 small partitions each having their own task. That is far from what > actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up > splitting the large partition into 918 chunks of 10 or 11 messages, two > orders of magnitude from the desired maximum message count per task and > nearly double the number of Spark tasks hinted in the configuration. > Proposing that the {{KafkaOffsetRangeCalculator}}'s range calculation logic > be modified to exclude small (i.e. un-split) partitions from the overall > proportional distribution math, in order to more reasonably divide the large > partitions when they are accompanied by many small partitions, and to provide > optimal behavior for cases where a {{minPartitions}} value is deliberately > computed based on the volume of data being read. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org