SubhamSinghal commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1778179309


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -47,13 +48,15 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
     val offsetRanges = ranges.filter(_.size > 0)
 
     // If minPartitions not set or there are enough partitions to satisfy 
minPartitions
-    if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) {
+    // and maxBytesPerPartition is empty
+    if ((minPartitions.isEmpty || offsetRanges.size >= minPartitions.get)
+        && maxRecordsPerPartition.isEmpty) {
       // Assign preferred executor locations to each range such that the same 
topic-partition is
       // preferentially read from the same executor and the KafkaConsumer can 
be reused.
       offsetRanges.map { range =>
         range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
       }
-    } else {
+    } else if (minPartitions.isDefined && minPartitions.get > 
offsetRanges.size) {

Review Comment:
   Thanks for highlighting this issue. I am dividing partitions based on 
**maxRecordsPerPartition** first so that partitions have uniform size data, 
then I am passing these partitions for further split based on **minPartition** 
config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to