SubhamSinghal commented on code in PR #47927: URL: https://github.com/apache/spark/pull/47927#discussion_r1778179309
########## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala: ########## @@ -47,13 +48,15 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int val offsetRanges = ranges.filter(_.size > 0) // If minPartitions not set or there are enough partitions to satisfy minPartitions - if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) { + // and maxBytesPerPartition is empty + if ((minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) + && maxRecordsPerPartition.isEmpty) { // Assign preferred executor locations to each range such that the same topic-partition is // preferentially read from the same executor and the KafkaConsumer can be reused. offsetRanges.map { range => range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) } - } else { + } else if (minPartitions.isDefined && minPartitions.get > offsetRanges.size) { Review Comment: Thanks for highlighting this issue. I am dividing partitions based on **maxRecordsPerPartition** first so that partitions have uniform size data, then I am passing these partitions for further split based on **minPartition** config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org