zsxwing commented on a change in pull request #25237: [SPARK-28489][SS]Fix a 
bug that KafkaOffsetRangeCalculator.getRanges may drop offsets
URL: https://github.com/apache/spark/pull/25237#discussion_r306583646
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ##########
 @@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
 
       // Splits offset ranges with relatively large amount of data to smaller 
ones.
       val totalSize = offsetRanges.map(_.size).sum
-      val idealRangeSize = totalSize.toDouble / minPartitions.get
-
       offsetRanges.flatMap { range =>
-        // Split the current range into subranges as close to the ideal range 
size
-        val numSplitsInRange = math.round(range.size.toDouble / 
idealRangeSize).toInt
-
-        (0 until numSplitsInRange).map { i =>
-          val splitStart = range.fromOffset + range.size * (i.toDouble / 
numSplitsInRange)
-          val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / 
numSplitsInRange)
-          KafkaOffsetRange(
-            range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+        val tp = range.topicPartition
+        val size = range.size
+        // number of partitions to divvy up this topic partition to
+        val parts = math.max(math.round(size.toDouble / totalSize * 
minPartitions.get), 1).toInt
+        var remaining = size
+        var startOffset = range.fromOffset
+        (0 until parts).map { part =>
+          // Fine to do integer division. Last partition will consume all the 
round off errors
+          val thisPartition = remaining / (parts - part)
 
 Review comment:
   `thisPartition` will be the same as `remaining` for the last part. This will 
ensure we always get a KafkaOffsetRange ending with `range.untilOffset`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to