This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7142e04639c [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2 7142e04639c is described below commit 7142e04639c1481e41ad499e657b2c62120fe763 Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Fri Jun 30 06:50:24 2023 +0900 [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2 ### What changes were proposed in this pull request? In KafkaBatchInputPartition, which is used for Kafka v2 source, preferredLocations() is now returned from the location already pre-calculated. ### Why are the changes needed? DSv2 Kafka streaming source seems to miss setting the preferred location, which may destroy the purpose of cache for Kafka consumer (connection) & fetched data. For DSv1, we have set the preferred location in RDD. This information is not returned in DSv2. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Some manual verification. Closes #41790 from siying/kafkav2loc. Authored-by: Siying Dong <siying.d...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 508f5c7036b..97c8592d1da 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -34,7 +34,11 @@ private[kafka010] case class KafkaBatchInputPartition( executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, failOnDataLoss: Boolean, - includeHeaders: Boolean) extends InputPartition + includeHeaders: Boolean) extends InputPartition { + override def preferredLocations(): Array[String] = { + offsetRange.preferredLoc.map(Array(_)).getOrElse(Array()) + } +} private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org