This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cfd4a08 [SPARK-33962][SS] Fix incorrect min partition condition cfd4a08 is described below commit cfd4a083987f985da4659333c718561c19e0cbfe Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sun Jan 3 01:29:12 2021 -0800 [SPARK-33962][SS] Fix incorrect min partition condition ### What changes were proposed in this pull request? This patch fixes an incorrect condition when comparing offset range size and min partition config. ### Why are the changes needed? When calculating offset ranges, we consider `minPartitions` configuration. If `minPartitions` is not set or is less than or equal the size of given ranges, it means there are enough partitions at Kafka so we don't need to split offsets to satisfy min partition requirement. But the current condition is `offsetRanges.size > minPartitions.get` and is not correct. Currently `getRanges` will split offsets in unnecessary case. Besides, in non-split case, we can assign preferred executor location and reuse `KafkaConsumer`. So unnecessary splitting offset range will miss the chance to reuse `KafkaConsumer`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Manual test in Spark cluster with Kafka. Closes #30994 from viirya/ss-minor4. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/kafka010/KafkaOffsetRangeCalculator.scala | 2 +- .../sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index f7183f7..1e9a62e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -46,7 +46,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int val offsetRanges = ranges.filter(_.size > 0) // If minPartitions not set or there are enough partitions to satisfy minPartitions - if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) { // Assign preferred executor locations to each range such that the same topic-partition is // preferentially read from the same executor and the KafkaConsumer can be reused. offsetRanges.map { range => diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 5d010cd..751b877 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -71,6 +71,20 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { KafkaOffsetRange(tp3, 1, 2, None))) } + testWithMinPartitions("N TopicPartitions to N offset ranges with executors", 3) { calc => + assert( + calc.getRanges( + Seq( + KafkaOffsetRange(tp1, 1, 2), + KafkaOffsetRange(tp2, 1, 2), + KafkaOffsetRange(tp3, 1, 2)), + Seq("exec1", "exec2", "exec3")) === + Seq( + KafkaOffsetRange(tp1, 1, 2, Some("exec3")), + KafkaOffsetRange(tp2, 1, 2, Some("exec1")), + KafkaOffsetRange(tp3, 1, 2, Some("exec2")))) + } + testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc => assert( calc.getRanges( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org