This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f56ba37  [SPARK-30656][SS] Support the "minPartitions" option in Kafka 
batch source and streaming source v1
f56ba37 is described below

commit f56ba37d8bf618f2bef23d808e0fc5704261b139
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Thu Jan 30 18:14:50 2020 -0800

    [SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source 
and streaming source v1
    
    ### What changes were proposed in this pull request?
    
    - Add `minPartitions` support for Kafka Streaming V1 source.
    - Add `minPartitions` support for Kafka batch V1  and V2 source.
    - There is lots of refactoring (moving codes to KafkaOffsetReader) to reuse 
codes.
    
    ### Why are the changes needed?
    
    Right now, the "minPartitions" option only works in Kafka streaming source 
v2. It would be great that we can support it in batch and streaming source v1 
(v1 is the fallback mode when a user hits a regression in v2) as well.
    
    ### Does this PR introduce any user-facing change?
    
    Yep. The `minPartitions` options is supported in Kafka batch and streaming 
queries for both data source V1 and V2.
    
    ### How was this patch tested?
    
    New unit tests are added to test "minPartitions".
    
    Closes #27388 from zsxwing/kafka-min-partitions.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 docs/structured-streaming-kafka-integration.md     |   2 +-
 .../org/apache/spark/sql/kafka010/KafkaBatch.scala |  32 +----
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  75 +---------
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  20 +--
 .../spark/sql/kafka010/KafkaOffsetReader.scala     | 156 +++++++++++++++++++++
 .../apache/spark/sql/kafka010/KafkaRelation.scala  |  32 +----
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  64 +--------
 .../apache/spark/sql/kafka010/KafkaSourceRDD.scala |  21 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  29 ++++
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 107 +++++++++-----
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      | 139 ++++++++++++++++++
 .../spark/sql/kafka010/KafkaRelationSuite.scala    |  22 +++
 12 files changed, 448 insertions(+), 251 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 0820b38..a1eeee5 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -469,7 +469,7 @@ The following configurations are optional:
   <td>minPartitions</td>
   <td>int</td>
   <td>none</td>
-  <td>streaming</td>
+  <td>streaming and batch</td>
   <td>Desired minimum number of partitions to read from Kafka.
   By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions 
consuming from Kafka.
   If you set this option to a value greater than your topicPartitions, Spark 
will divvy up large
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
index 3006770..9ad083f 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
@@ -57,36 +57,12 @@ private[kafka010] class KafkaBatch(
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
     // Leverage the KafkaReader to obtain the relevant partition offsets
-    val (fromPartitionOffsets, untilPartitionOffsets) = {
-      try {
-        (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, 
isStartingOffsets = true),
-          kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, 
isStartingOffsets = false))
-      } finally {
-        kafkaOffsetReader.close()
-      }
+    val offsetRanges: Seq[KafkaOffsetRange] = try {
+      kafkaOffsetReader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, 
endingOffsets)
+    } finally {
+      kafkaOffsetReader.close()
     }
 
-    // Obtain topicPartitions in both from and until partition offset, ignoring
-    // topic partitions that were added and/or deleted between the two above 
calls.
-    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
-      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => 
t.topic())
-      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
-      val untilTopics = 
untilPartitionOffsets.keySet.toList.sorted.mkString(",")
-      throw new IllegalStateException("different topic partitions " +
-        s"for starting offsets topics[${fromTopics}] and " +
-        s"ending offsets topics[${untilTopics}]")
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp,
-        // This should not happen since topicPartitions contains all 
partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset"))
-      val untilOffset = untilPartitionOffsets(tp)
-      KafkaOffsetRange(tp, fromOffset, untilOffset, None)
-    }.toArray
-
     val executorKafkaParams =
       KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, 
uniqueGroupId)
     offsetRanges.map { range =>
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 01f6ba4..844c963 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -66,8 +66,6 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
 
-  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
-
   private var endPartitionOffsets: KafkaSourceOffset = _
 
   /**
@@ -94,57 +92,11 @@ private[kafka010] class KafkaMicroBatchStream(
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val endPartitionOffsets = 
end.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = 
endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
-    val newPartitionInitialOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionInitialOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got 
deleted.
-      val deletedPartitions = 
newPartitions.diff(newPartitionInitialOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionInitialOffsets")
-    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have 
been missed")
-    }
-
-    // Find deleted partitions, and report data loss if required
-    val deletedPartitions = 
startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      val message =
-        if 
(kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
 {
-          s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-        } else {
-          s"$deletedPartitions are gone. Some data may have been missed."
-        }
-      reportDataLoss(message)
-    }
-
-    // Use the end partitions to calculate offset ranges to ignore partitions 
that have
-    // been deleted
-    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionInitialOffsets.contains(tp) || 
startPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
-    val untilOffsets = endPartitionOffsets
-    untilOffsets.foreach { case (tp, untilOffset) =>
-      fromOffsets.get(tp).foreach { fromOffset =>
-        if (untilOffset < fromOffset) {
-          reportDataLoss(s"Partition $tp's offset was changed from " +
-            s"$fromOffset to $untilOffset, some data may have been missed")
-        }
-      }
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = fromOffsets,
-      untilOffsets = untilOffsets,
-      executorLocations = getSortedExecutorList())
+    val offsetRanges = kafkaOffsetReader.getOffsetRangesFromResolvedOffsets(
+      startPartitionOffsets,
+      endPartitionOffsets,
+      reportDataLoss
+    )
 
     // Generate factories based on the offset ranges
     offsetRanges.map { range =>
@@ -242,23 +194,6 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
-  private def getSortedExecutorList(): Array[String] = {
-
-    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): 
Boolean = {
-      if (a.host == b.host) {
-        a.executorId > b.executorId
-      } else {
-        a.host > b.host
-      }
-    }
-
-    val bm = SparkEnv.get.blockManager
-    bm.master.getPeers(bm.blockManagerId).toArray
-      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
-      .sortWith(compare)
-      .map(_.toString)
-  }
-
   /**
    * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
    * Otherwise, just log a warning.
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index ead4542..f7183f7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -41,14 +41,9 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
    * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
    */
   def getRanges(
-      fromOffsets: PartitionOffsetMap,
-      untilOffsets: PartitionOffsetMap,
+      ranges: Seq[KafkaOffsetRange],
       executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = {
-    val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet)
-
-    val offsetRanges = partitionsToRead.toSeq.map { tp =>
-      KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = 
None)
-    }.filter(_.size > 0)
+    val offsetRanges = ranges.filter(_.size > 0)
 
     // If minPartitions not set or there are enough partitions to satisfy 
minPartitions
     if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
@@ -106,6 +101,13 @@ private[kafka010] case class KafkaOffsetRange(
     topicPartition: TopicPartition,
     fromOffset: Long,
     untilOffset: Long,
-    preferredLoc: Option[String]) {
-  lazy val size: Long = untilOffset - fromOffset
+    preferredLoc: Option[String] = None) {
+  def topic: String = topicPartition.topic
+  def partition: Int = topicPartition.partition
+  /**
+   * The estimated size of messages in the range. It may be different than the 
real number of
+   * messages due to log compaction or transaction metadata. It should not be 
used to provide
+   * answers directly.
+   */
+  def size: Long = untilOffset - fromOffset
 }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 0179f4d..216e74a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -29,7 +29,9 @@ import scala.util.control.NonFatal
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, OffsetAndTimestamp}
 import org.apache.kafka.common.TopicPartition
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
 
@@ -91,9 +93,27 @@ private[kafka010] class KafkaOffsetReader(
   private[kafka010] val maxOffsetFetchAttempts =
     readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, 
"3").toInt
 
+  /**
+   * Number of partitions to read from Kafka. If this value is greater than 
the number of Kafka
+   * topicPartitions, we will split up  the read tasks of the skewed 
partitions to multiple Spark
+   * tasks. The number of Spark tasks will be *approximately* `numPartitions`. 
It can be less or
+   * more depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  private val minPartitions =
+    
readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+
+  private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+
   private[kafka010] val offsetFetchAttemptIntervalMs =
     
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, 
"1000").toLong
 
+  /**
+   * Whether we should divide Kafka TopicPartitions with a lot of data into 
smaller Spark tasks.
+   */
+  private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean = 
{
+    minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+  }
+
   private def nextGroupId(): String = {
     groupId = driverGroupIdPrefix + "-" + nextId
     nextId += 1
@@ -372,6 +392,142 @@ private[kafka010] class KafkaOffsetReader(
     }
   }
 
+  /**
+   * Return the offset ranges for a Kafka batch query. If `minPartitions` is 
set, this method may
+   * split partitions to respect it. Since offsets can be early and late 
binding which are evaluated
+   * on the executors, in order to divvy up the partitions we need to perform 
some substitutions. We
+   * don't want to send exact offsets to the executors, because data may age 
out before we can
+   * consume the data. This method makes some approximate splitting, and 
replaces the special offset
+   * values in the final output.
+   */
+  def getOffsetRangesFromUnresolvedOffsets(
+      startingOffsets: KafkaOffsetRangeLimit,
+      endingOffsets: KafkaOffsetRangeLimit): Seq[KafkaOffsetRange] = {
+    val fromPartitionOffsets = fetchPartitionOffsets(startingOffsets, 
isStartingOffsets = true)
+    val untilPartitionOffsets = fetchPartitionOffsets(endingOffsets, 
isStartingOffsets = false)
+
+    // Obtain topicPartitions in both from and until partition offset, ignoring
+    // topic partitions that were added and/or deleted between the two above 
calls.
+    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
+      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => 
t.topic())
+      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
+      val untilTopics = 
untilPartitionOffsets.keySet.toList.sorted.mkString(",")
+      throw new IllegalStateException("different topic partitions " +
+        s"for starting offsets topics[${fromTopics}] and " +
+        s"ending offsets topics[${untilTopics}]")
+    }
+
+    // Calculate offset ranges
+    val offsetRangesBase = untilPartitionOffsets.keySet.map { tp =>
+      val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+        // This should not happen since topicPartitions contains all 
partitions not in
+        // fromPartitionOffsets
+        throw new IllegalStateException(s"$tp doesn't have a from offset")
+      }
+      val untilOffset = untilPartitionOffsets(tp)
+      KafkaOffsetRange(tp, fromOffset, untilOffset, None)
+    }.toSeq
+
+    if (shouldDivvyUpLargePartitions(offsetRangesBase.size)) {
+      val fromOffsetsMap =
+        offsetRangesBase.map(range => (range.topicPartition, 
range.fromOffset)).toMap
+      val untilOffsetsMap =
+        offsetRangesBase.map(range => (range.topicPartition, 
range.untilOffset)).toMap
+
+      // No need to report data loss here
+      val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => 
()).partitionToOffsets
+      val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => 
()).partitionToOffsets
+      val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
+        KafkaOffsetRange(tp, resolvedFromOffsets(tp), 
resolvedUntilOffsets(tp), preferredLoc = None)
+      }
+      val divvied = rangeCalculator.getRanges(ranges).groupBy(_.topicPartition)
+      divvied.flatMap { case (tp, splitOffsetRanges) =>
+        if (splitOffsetRanges.length == 1) {
+          Seq(KafkaOffsetRange(tp, fromOffsetsMap(tp), untilOffsetsMap(tp), 
None))
+        } else {
+          // the list can't be empty
+          val first = splitOffsetRanges.head.copy(fromOffset = 
fromOffsetsMap(tp))
+          val end = splitOffsetRanges.last.copy(untilOffset = 
untilOffsetsMap(tp))
+          Seq(first) ++ splitOffsetRanges.drop(1).dropRight(1) :+ end
+        }
+      }.toArray.toSeq
+    } else {
+      offsetRangesBase
+    }
+  }
+
+  private def getSortedExecutorList(): Array[String] = {
+    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): 
Boolean = {
+      if (a.host == b.host) {
+        a.executorId > b.executorId
+      } else {
+        a.host > b.host
+      }
+    }
+
+    val bm = SparkEnv.get.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compare)
+      .map(_.toString)
+  }
+
+  /**
+   * Return the offset ranges for a Kafka streaming batch. If `minPartitions` 
is set, this method
+   * may split partitions to respect it. If any data lost issue is detected, 
`reportDataLoss` will
+   * be called.
+   */
+  def getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets: PartitionOffsetMap,
+      untilPartitionOffsets: PartitionOffsetMap,
+      reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+    // Find the new partitions, and get their earliest offsets
+    val newPartitions = 
untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
+    val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
+    if (newPartitionInitialOffsets.keySet != newPartitions) {
+      // We cannot get from offsets for some partitions. It means they got 
deleted.
+      val deletedPartitions = 
newPartitions.diff(newPartitionInitialOffsets.keySet)
+      reportDataLoss(
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed")
+    }
+    logInfo(s"Partitions added: $newPartitionInitialOffsets")
+    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+      reportDataLoss(
+        s"Added partition $p starts from $o instead of 0. Some data may have 
been missed")
+    }
+
+    val deletedPartitions = 
fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
+    if (deletedPartitions.nonEmpty) {
+      val message = if 
(driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        s"$deletedPartitions are gone. 
${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      } else {
+        s"$deletedPartitions are gone. Some data may have been missed."
+      }
+      reportDataLoss(message)
+    }
+
+    // Use the until partitions to calculate offset ranges to ignore 
partitions that have
+    // been deleted
+    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
+      // Ignore partitions that we don't know the from offsets.
+      newPartitionInitialOffsets.contains(tp) || 
fromPartitionOffsets.contains(tp)
+    }.toSeq
+    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+    val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
+    val untilOffsets = untilPartitionOffsets
+    val ranges = topicPartitions.map { tp =>
+      val fromOffset = fromOffsets(tp)
+      val untilOffset = untilOffsets(tp)
+      if (untilOffset < fromOffset) {
+        reportDataLoss(s"Partition $tp's offset was changed from " +
+          s"$fromOffset to $untilOffset, some data may have been missed")
+      }
+      KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
+    }
+    rangeCalculator.getRanges(ranges, getSortedExecutorList)
+  }
+
   private def partitionsAssignedToConsumer(
       body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
       fetchingEarliestOffset: Boolean = false)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 61479c9..413a0c4 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -66,36 +66,12 @@ private[kafka010] class KafkaRelation(
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
     // Leverage the KafkaReader to obtain the relevant partition offsets
-    val (fromPartitionOffsets, untilPartitionOffsets) = {
-      try {
-        (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, 
isStartingOffsets = true),
-          kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, 
isStartingOffsets = false))
-      } finally {
-        kafkaOffsetReader.close()
-      }
+    val offsetRanges: Seq[KafkaOffsetRange] = try {
+      kafkaOffsetReader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, 
endingOffsets)
+    } finally {
+      kafkaOffsetReader.close()
     }
 
-    // Obtain topicPartitions in both from and until partition offset, ignoring
-    // topic partitions that were added and/or deleted between the two above 
calls.
-    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
-      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => 
t.topic())
-      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
-      val untilTopics = 
untilPartitionOffsets.keySet.toList.sorted.mkString(",")
-      throw new IllegalStateException("different topic partitions " +
-        s"for starting offsets topics[${fromTopics}] and " +
-        s"ending offsets topics[${untilTopics}]")
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp,
-        // This should not happen since topicPartitions contains all 
partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset"))
-      val untilOffset = untilPartitionOffsets(tp)
-      KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
-    }.toArray
-
     logInfo("GetBatch generating RDD of offset range: " +
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index e1392b6..f0b3bf1 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -210,66 +210,10 @@ private[kafka010] class KafkaSource(
         initialPartitionOffsets
     }
 
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = 
untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
-    val newPartitionOffsets = 
kafkaReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got 
deleted.
-      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionOffsets")
-    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have 
been missed")
-    }
-
-    val deletedPartitions = 
fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      val message = if 
(kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
-    }
-
-    // Use the until partitions to calculate offset ranges to ignore 
partitions that have
-    // been deleted
-    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    val sortedExecutors = getSortedExecutorList(sc)
-    val numExecutors = sortedExecutors.length
-    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
-
-    // Calculate offset ranges
-    val offsetRanges = topicPartitions.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp, 
newPartitionOffsets.getOrElse(tp, {
-        // This should not happen since newPartitionOffsets contains all 
partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset")
-      }))
-      val untilOffset = untilPartitionOffsets(tp)
-      val preferredLoc = if (numExecutors > 0) {
-        // This allows cached KafkaConsumers in the executors to be re-used to 
read the same
-        // partition in every batch.
-        Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
-      } else None
-      KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
-    }.filter { range =>
-      if (range.untilOffset < range.fromOffset) {
-        reportDataLoss(s"Partition ${range.topicPartition}'s offset was 
changed from " +
-          s"${range.fromOffset} to ${range.untilOffset}, some data may have 
been missed")
-        false
-      } else {
-        true
-      }
-    }.toArray
+    val offsetRanges = kafkaReader.getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets,
+      untilPartitionOffsets,
+      reportDataLoss)
 
     // Create an RDD that reads from Kafka and get the (key, value) pair as 
byte arrays.
     val rdd = if (includeHeaders) {
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index f1f3871..5475864 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.kafka010
 import java.{util => ju}
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -28,21 +27,9 @@ import 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NextIterator
 
-/** Offset range that one partition of the KafkaSourceRDD has to read */
-private[kafka010] case class KafkaSourceRDDOffsetRange(
-    topicPartition: TopicPartition,
-    fromOffset: Long,
-    untilOffset: Long,
-    preferredLoc: Option[String]) {
-  def topic: String = topicPartition.topic
-  def partition: Int = topicPartition.partition
-  def size: Long = untilOffset - fromOffset
-}
-
-
 /** Partition of the KafkaSourceRDD */
 private[kafka010] case class KafkaSourceRDDPartition(
-  index: Int, offsetRange: KafkaSourceRDDOffsetRange) extends Partition
+  index: Int, offsetRange: KafkaOffsetRange) extends Partition
 
 
 /**
@@ -58,7 +45,7 @@ private[kafka010] case class KafkaSourceRDDPartition(
 private[kafka010] class KafkaSourceRDD(
     sc: SparkContext,
     executorKafkaParams: ju.Map[String, Object],
-    offsetRanges: Seq[KafkaSourceRDDOffsetRange],
+    offsetRanges: Seq[KafkaOffsetRange],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean)
   extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]](sc, Nil) {
@@ -130,7 +117,7 @@ private[kafka010] class KafkaSourceRDD(
     }
   }
 
-  private def resolveRange(consumer: KafkaDataConsumer, range: 
KafkaSourceRDDOffsetRange) = {
+  private def resolveRange(consumer: KafkaDataConsumer, range: 
KafkaOffsetRange) = {
     if (range.fromOffset < 0 || range.untilOffset < 0) {
       // Late bind the offset range
       val availableOffsetRange = consumer.getAvailableOffsetRange()
@@ -148,7 +135,7 @@ private[kafka010] class KafkaSourceRDD(
       } else {
         range.untilOffset
       }
-      KafkaSourceRDDOffsetRange(range.topicPartition,
+      KafkaOffsetRange(range.topicPartition,
         fromOffset, untilOffset, range.preferredLoc)
     } else {
       range
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 3ee59e5..468b21c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1063,6 +1063,35 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   test("SPARK-27494: read kafka record containing null key/values.") {
     testNullableKeyValue(Trigger.ProcessingTime(100))
   }
+
+  test("SPARK-30656: minPartitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("20"), Some(2))
+
+    val ds = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      .option("minPartitions", "6")
+      .load()
+      .select($"value".as[String])
+    val q = ds.writeStream.foreachBatch { (batch: Dataset[String], _: Long) =>
+      val partitions = batch.rdd.collectPartitions()
+      assert(partitions.length >= 6)
+      assert(partitions.flatten.toSet === (0 to 20).map(_.toString).toSet): 
Unit
+    }.start()
+    try {
+      q.processAllAvailable()
+    } finally {
+      q.stop()
+    }
+  }
 }
 
 
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 2374a81..5d010cd 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -34,31 +34,16 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
     }
   }
 
-
   test("with no minPartition: N TopicPartitions to N offset ranges") {
     val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 2)) ==
-      Seq(KafkaOffsetRange(tp1, 1, 2, None)))
-
-    assert(
-      calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) ==
+        Seq(KafkaOffsetRange(tp1, 1, 2))) ==
       Seq(KafkaOffsetRange(tp1, 1, 2, None)))
 
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2)) ==
-      Seq(KafkaOffsetRange(tp1, 1, 2, None)))
-
-    assert(
-      calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2),
+        Seq(KafkaOffsetRange(tp1, 1, 2)),
         executorLocations = Seq("location")) ==
       Seq(KafkaOffsetRange(tp1, 1, 2, Some("location"))))
   }
@@ -67,16 +52,19 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
     val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 1))) ===
       Seq(KafkaOffsetRange(tp1, 1, 2, None)))
   }
 
   testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 2),
+          KafkaOffsetRange(tp3, 1, 2))) ===
       Seq(
         KafkaOffsetRange(tp1, 1, 2, None),
         KafkaOffsetRange(tp2, 1, 2, None),
@@ -86,18 +74,16 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
   testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 5)) ==
-      Seq(
-        KafkaOffsetRange(tp1, 1, 2, None),
-        KafkaOffsetRange(tp1, 2, 3, None),
-        KafkaOffsetRange(tp1, 3, 4, None),
-        KafkaOffsetRange(tp1, 4, 5, None)))
+        Seq(KafkaOffsetRange(tp1, 1, 5))) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, None),
+          KafkaOffsetRange(tp1, 2, 3, None),
+          KafkaOffsetRange(tp1, 3, 4, None),
+          KafkaOffsetRange(tp1, 4, 5, None)))
 
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 5),
+        Seq(KafkaOffsetRange(tp1, 1, 5)),
         executorLocations = Seq("location")) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 2, None),
@@ -109,8 +95,9 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { 
calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 5, tp2 -> 21)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21))) ===
         Seq(
           KafkaOffsetRange(tp1, 1, 5, None),
           KafkaOffsetRange(tp2, 1, 7, None),
@@ -118,11 +105,51 @@ class KafkaOffsetRangeCalculatorSuite extends 
SparkFunSuite {
           KafkaOffsetRange(tp2, 14, 21, None)))
   }
 
+  testWithMinPartitions("SPARK-30656: ignore empty ranges and split the rest", 
4) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 1),
+          KafkaOffsetRange(tp2, 1, 21))) ===
+        Seq(
+          KafkaOffsetRange(tp2, 1, 6, None),
+          KafkaOffsetRange(tp2, 6, 11, None),
+          KafkaOffsetRange(tp2, 11, 16, None),
+          KafkaOffsetRange(tp2, 16, 21, None)))
+  }
+
+  testWithMinPartitions(
+      "SPARK-30656: N very skewed TopicPartitions to M offset ranges",
+      3) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 1001))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, None),
+          KafkaOffsetRange(tp2, 1, 334, None),
+          KafkaOffsetRange(tp2, 334, 667, None),
+          KafkaOffsetRange(tp2, 667, 1001, None)))
+  }
+
+  testWithMinPartitions(
+      "SPARK-30656: minPartitions less than the length of topic partitions",
+      1) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5, None),
+          KafkaOffsetRange(tp2, 1, 21, None)))
+  }
+
   testWithMinPartitions("range inexact multiple of minPartitions", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 11)) ==
+        Seq(KafkaOffsetRange(tp1, 1, 11))) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 4, None),
           KafkaOffsetRange(tp1, 4, 7, None),
@@ -132,8 +159,10 @@ class KafkaOffsetRangeCalculatorSuite extends 
SparkFunSuite {
   testWithMinPartitions("empty ranges ignored", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
-        untilOffsets = Map(tp1 -> 5, tp2 -> 21, tp3 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21),
+          KafkaOffsetRange(tp3, 1, 1))) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 5, None),
           KafkaOffsetRange(tp2, 1, 7, None),
@@ -144,8 +173,10 @@ class KafkaOffsetRangeCalculatorSuite extends 
SparkFunSuite {
   testWithMinPartitions("SPARK-28489: never drop offsets", 6) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0),
-        untilOffsets = Map(tp1 -> 10, tp2 -> 10, tp3 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 0, 10),
+          KafkaOffsetRange(tp2, 0, 10),
+          KafkaOffsetRange(tp3, 0, 1))) ==
         Seq(
           KafkaOffsetRange(tp1, 0, 3, None),
           KafkaOffsetRange(tp1, 3, 6, None),
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
new file mode 100644
index 0000000..ad22a56
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.kafka010.KafkaOffsetRangeLimit.{EARLIEST, LATEST}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with 
KafkaTest {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def createKafkaReader(topic: String, minPartitions: Option[Int]): 
KafkaOffsetReader = {
+    new KafkaOffsetReader(
+      SubscribeStrategy(Seq(topic)),
+      org.apache.spark.sql.kafka010.KafkaSourceProvider.kafkaParamsForDriver(
+        Map(
+        "bootstrap.servers" ->
+         testUtils.brokerAddress
+      )),
+      CaseInsensitiveMap(
+        minPartitions.map(m => Map("minPartitions" -> 
m.toString)).getOrElse(Map.empty)),
+      UUID.randomUUID().toString
+    )
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - using specific 
offsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (0 until 10).map(_.toString).toArray, 
Some(0))
+    val tp = new TopicPartition(topic, 0)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val startingOffsets = SpecificOffsetRangeLimit(Map(tp -> 1))
+    val endingOffsets = SpecificOffsetRangeLimit(Map(tp -> 4))
+    val offsetRanges = 
reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp, 1, 2, None),
+      KafkaOffsetRange(tp, 2, 3, None),
+      KafkaOffsetRange(tp, 3, 4, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - using special 
offsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(0))
+    val tp = new TopicPartition(topic, 0)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val startingOffsets = EarliestOffsetRangeLimit
+    val endingOffsets = LatestOffsetRangeLimit
+    val offsetRanges = 
reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp, EARLIEST, 1, None),
+      KafkaOffsetRange(tp, 1, 2, None),
+      KafkaOffsetRange(tp, 2, LATEST, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - multiple topic 
partitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (0 until 100).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+
+    val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> 
EARLIEST))
+    val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
+    val offsetRanges = 
reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp2, EARLIEST, 3, None),
+      KafkaOffsetRange(tp1, EARLIEST, 33, None),
+      KafkaOffsetRange(tp1, 33, 66, None),
+      KafkaOffsetRange(tp1, 66, LATEST, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromResolvedOffsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (0 until 100).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+
+    val fromPartitionOffsets = Map(tp1 -> 0L, tp2 -> 0L)
+    val untilPartitionOffsets = Map(tp1 -> 100L, tp2 -> 3L)
+    val offsetRanges = reader.getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets,
+      untilPartitionOffsets,
+      _ => {})
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp1, 0, 33, None),
+      KafkaOffsetRange(tp1, 33, 66, None),
+      KafkaOffsetRange(tp1, 66, 100, None),
+      KafkaOffsetRange(tp2, 0, 3, None)))
+  }
+}
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 2c022c1..32d0561 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -597,6 +597,28 @@ abstract class KafkaRelationSuiteBase extends QueryTest 
with SharedSparkSession
       checkAnswer(df, (1 to 15).map(_.toString).toDF)
     }
   }
+
+  test("SPARK-30656: minPartitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("20"), Some(2))
+
+    // Implicit offset values, should default to earliest and latest
+    val df = createDF(topic, Map("minPartitions" -> "6"))
+    val rdd = df.rdd
+    val partitions = rdd.collectPartitions()
+    assert(partitions.length >= 6)
+    assert(partitions.flatMap(_.map(_.getString(0))).toSet === (0 to 
20).map(_.toString).toSet)
+
+    // Because of late binding, reused `rdd` and `df` should see the new data.
+    testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray)
+    assert(rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet
+      === (0 to 30).map(_.toString).toSet)
+    assert(df.rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet
+      === (0 to 30).map(_.toString).toSet)
+  }
 }
 
 class KafkaRelationSuiteV1 extends KafkaRelationSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to