This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd4a08  [SPARK-33962][SS] Fix incorrect min partition condition
cfd4a08 is described below

commit cfd4a083987f985da4659333c718561c19e0cbfe
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Sun Jan 3 01:29:12 2021 -0800

    [SPARK-33962][SS] Fix incorrect min partition condition
    
    ### What changes were proposed in this pull request?
    
    This patch fixes an incorrect condition when comparing offset range size 
and min partition config.
    
    ### Why are the changes needed?
    
    When calculating offset ranges, we consider `minPartitions` configuration. 
If `minPartitions` is not set or is less than or equal the size of given 
ranges, it means there are enough partitions at Kafka so we don't need to split 
offsets to satisfy min partition requirement. But the current condition is 
`offsetRanges.size > minPartitions.get` and is not correct. Currently 
`getRanges` will split offsets in unnecessary case.
    
    Besides, in non-split case, we can assign preferred executor location and 
reuse `KafkaConsumer`. So unnecessary splitting offset range will miss the 
chance to reuse `KafkaConsumer`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test.
    
    Manual test in Spark cluster with Kafka.
    
    Closes #30994 from viirya/ss-minor4.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/kafka010/KafkaOffsetRangeCalculator.scala    |  2 +-
 .../sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala     | 14 ++++++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index f7183f7..1e9a62e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -46,7 +46,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
     val offsetRanges = ranges.filter(_.size > 0)
 
     // If minPartitions not set or there are enough partitions to satisfy 
minPartitions
-    if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+    if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) {
       // Assign preferred executor locations to each range such that the same 
topic-partition is
       // preferentially read from the same executor and the KafkaConsumer can 
be reused.
       offsetRanges.map { range =>
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 5d010cd..751b877 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -71,6 +71,20 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
         KafkaOffsetRange(tp3, 1, 2, None)))
   }
 
+  testWithMinPartitions("N TopicPartitions to N offset ranges with executors", 
3) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 2),
+          KafkaOffsetRange(tp3, 1, 2)),
+        Seq("exec1", "exec2", "exec3")) ===
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, Some("exec3")),
+          KafkaOffsetRange(tp2, 1, 2, Some("exec1")),
+          KafkaOffsetRange(tp3, 1, 2, Some("exec2"))))
+  }
+
   testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
     assert(
       calc.getRanges(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to