zsxwing commented on a change in pull request #25237: [SPARK-28489][SS]Fix a bug that KafkaOffsetRangeCalculator.getRanges may drop offsets URL: https://github.com/apache/spark/pull/25237#discussion_r306583646
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala ########## @@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int // Splits offset ranges with relatively large amount of data to smaller ones. val totalSize = offsetRanges.map(_.size).sum - val idealRangeSize = totalSize.toDouble / minPartitions.get - offsetRanges.flatMap { range => - // Split the current range into subranges as close to the ideal range size - val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt - - (0 until numSplitsInRange).map { i => - val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange) - val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange) - KafkaOffsetRange( - range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) + val tp = range.topicPartition + val size = range.size + // number of partitions to divvy up this topic partition to + val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt + var remaining = size + var startOffset = range.fromOffset + (0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) Review comment: `thisPartition` will be the same as `remaining` for the last part. This will ensure we always get a KafkaOffsetRange ending with `range.untilOffset`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org