This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/comet-parquet-exec by this 
push:
     new 8a0df9d3 [comet-parquet-exec] Handle CometNativeScan RDD when 
DataSourceRDD instead of FileScanRDD (#1088)
8a0df9d3 is described below

commit 8a0df9d3cd60beeb2e6038793b1c1c7e3a947f98
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Nov 15 10:52:25 2024 -0500

    [comet-parquet-exec] Handle CometNativeScan RDD when DataSourceRDD instead 
of FileScanRDD (#1088)
    
    * DataSourceRDD handling (seems to be related to prefetching, so maybe not 
relevant for our ParquetExec).
    
    * Refactor to reduce duplicate code.
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 51 ++++++++++++++--------
 1 file changed, 34 insertions(+), 17 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index cce48204..f9a25466 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -35,8 +35,9 @@ import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, 
ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, 
HashAggregateExec, ObjectHashAggregateExec}
-import org.apache.spark.sql.execution.datasources.FileScanRDD
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
 import 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, 
DataSourceRDDPartition}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, 
ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.window.WindowExec
@@ -2497,22 +2498,22 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output))
           nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava)
 
-          // Eventually we'll want to modify CometNativeScan to generate the 
file partitions
-          // for us without instantiating the RDD.
-          val file_partitions = 
scan.inputRDD.asInstanceOf[FileScanRDD].filePartitions;
-          file_partitions.foreach(partition => {
-            val partitionBuilder = 
OperatorOuterClass.SparkFilePartition.newBuilder()
-            partition.files.foreach(file => {
-              val fileBuilder = 
OperatorOuterClass.SparkPartitionedFile.newBuilder()
-              fileBuilder
-                .setFilePath(file.pathUri.toString)
-                .setStart(file.start)
-                .setLength(file.length)
-                .setFileSize(file.fileSize)
-              partitionBuilder.addPartitionedFile(fileBuilder.build())
-            })
-            nativeScanBuilder.addFilePartitions(partitionBuilder.build())
-          })
+          // TODO: modify CometNativeScan to generate the file partitions 
without instantiating RDD.
+          scan.inputRDD match {
+            case rdd: DataSourceRDD =>
+              val partitions = rdd.partitions
+              partitions.foreach(p => {
+                val inputPartitions = 
p.asInstanceOf[DataSourceRDDPartition].inputPartitions
+                inputPartitions.foreach(partition => {
+                  partition2Proto(partition.asInstanceOf[FilePartition], 
nativeScanBuilder)
+                })
+              })
+            case rdd: FileScanRDD =>
+              rdd.filePartitions.foreach(partition => {
+                partition2Proto(partition, nativeScanBuilder)
+              })
+            case _ =>
+          }
 
           val requiredSchemaParquet =
             new 
SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
@@ -3185,4 +3186,20 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
 
     true
   }
+
+  private def partition2Proto(
+      partition: FilePartition,
+      nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
+    val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
+    partition.files.foreach(file => {
+      val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
+      fileBuilder
+        .setFilePath(file.pathUri.toString)
+        .setStart(file.start)
+        .setLength(file.length)
+        .setFileSize(file.fileSize)
+      partitionBuilder.addPartitionedFile(fileBuilder.build())
+    })
+    nativeScanBuilder.addFilePartitions(partitionBuilder.build())
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to