This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b91c25e8 Feat: Add CometLocalTableScanExec operator (#2735)
9b91c25e8 is described below

commit 9b91c25e8d3391ab673054d305cb97d063ac270b
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Sun Nov 9 07:55:21 2025 -0800

    Feat: Add CometLocalTableScanExec operator (#2735)
---
 .../main/scala/org/apache/comet/CometConf.scala    |   2 +
 docs/source/user-guide/latest/configs.md           |   1 +
 .../org/apache/comet/rules/CometExecRule.scala     |  13 +++
 .../org/apache/comet/serde/QueryPlanSerde.scala    |   6 +-
 .../CometLocalTableScan.scala}                     |  36 +++----
 .../comet/serde/{ => operator}/CometProject.scala  |   3 +-
 .../comet/serde/{ => operator}/CometSort.scala     |   3 +-
 .../spark/sql/comet/CometLocalTableScanExec.scala  | 119 +++++++++++++++++++++
 .../org/apache/spark/sql/comet/operators.scala     |   2 +-
 .../org/apache/comet/exec/CometExecSuite.scala     |  24 +++++
 10 files changed, 185 insertions(+), 24 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 60fd1940b..4f868f496 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -251,6 +251,8 @@ object CometConf extends ShimCometConf {
     createExecEnabledConfig("window", defaultValue = true)
   val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
     createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
+  val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
+    createExecEnabledConfig("localTableScan", defaultValue = false)
 
   val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: 
ConfigEntry[Boolean] =
     conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index 1cc05dfc7..fd232874e 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -149,6 +149,7 @@ These settings can be used to determine which parts of the 
plan are accelerated
 | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by 
default. | true |
 | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. 
| true |
 | `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by 
default. | true |
+| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan 
by default. | false |
 | `spark.comet.exec.project.enabled` | Whether to enable project by default. | 
true |
 | `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
 | `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin 
by default. | true |
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index 873a1c55c..708a48d6a 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -534,6 +534,19 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
           s
         }
 
+      case op: LocalTableScanExec =>
+        if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
+          QueryPlanSerde
+            .operator2Proto(op)
+            .map { nativeOp =>
+              val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
+              CometScanWrapper(nativeOp, cometOp)
+            }
+            .getOrElse(op)
+        } else {
+          withInfo(op, "LocalTableScan is not enabled")
+        }
+
       case op =>
         op match {
           case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec 
|
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 3e0e837c9..909f6c175 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -47,6 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.Types.{DataType => ProtoDataType}
 import org.apache.comet.serde.Types.DataType._
 import org.apache.comet.serde.literals.CometLiteral
+import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, 
CometSort, CometSortOrder}
 import org.apache.comet.shims.CometExprShim
 
 /**
@@ -58,7 +59,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
    * Mapping of Spark operator class to Comet operator handler.
    */
   private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
-    Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort)
+    Map(
+      classOf[ProjectExec] -> CometProject,
+      classOf[SortExec] -> CometSort,
+      classOf[LocalTableScanExec] -> CometLocalTableScan)
 
   private val arrayExpressions: Map[Class[_ <: Expression], 
CometExpressionSerde[_]] = Map(
     classOf[ArrayAppend] -> CometArrayAppend,
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
similarity index 54%
copy from spark/src/main/scala/org/apache/comet/serde/CometProject.scala
copy to 
spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
index 651aa8fef..e3e8538cf 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
@@ -17,36 +17,32 @@
  * under the License.
  */
 
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.sql.execution.ProjectExec
+import org.apache.spark.sql.execution.LocalTableScanExec
 
 import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
 import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.exprToProto
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
 
-object CometProject extends CometOperatorSerde[ProjectExec] {
+object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] {
 
-  override def enabledConfig: Option[ConfigEntry[Boolean]] =
-    Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
+  override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
+    CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
 
   override def convert(
-      op: ProjectExec,
+      op: LocalTableScanExec,
       builder: Operator.Builder,
-      childOp: Operator*): Option[OperatorOuterClass.Operator] = {
-    val exprs = op.projectList.map(exprToProto(_, op.child.output))
-
-    if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
-      val projectBuilder = OperatorOuterClass.Projection
-        .newBuilder()
-        .addAllProjectList(exprs.map(_.get).asJava)
-      Some(builder.setProjection(projectBuilder).build())
-    } else {
-      withInfo(op, op.projectList: _*)
-      None
-    }
+      childOp: Operator*): Option[Operator] = {
+    val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
+    val scanBuilder = OperatorOuterClass.Scan
+      .newBuilder()
+      .setSource(op.getClass.getSimpleName)
+      .addAllFields(scanTypes.asJava)
+      .setArrowFfiSafe(false)
+    Some(builder.setScan(scanBuilder).build())
   }
 }
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
similarity index 94%
rename from spark/src/main/scala/org/apache/comet/serde/CometProject.scala
rename to 
spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
index 651aa8fef..4ba02945d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
 
 import scala.jdk.CollectionConverters._
 
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.ProjectExec
 
 import org.apache.comet.{CometConf, ConfigEntry}
 import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde.exprToProto
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
similarity index 95%
rename from spark/src/main/scala/org/apache/comet/serde/CometSort.scala
rename to spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
index 4a1063458..39a1c5565 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
 
 import scala.jdk.CollectionConverters._
 
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, 
DataTypes, MapType, Stru
 
 import org.apache.comet.{CometConf, ConfigEntry}
 import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, 
Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde.{exprToProto, 
exprToProtoInternal, supportedSortType}
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
new file mode 100644
index 000000000..611d367f3
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator
+import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
+import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import com.google.common.base.Objects
+
+import org.apache.comet.CometConf
+
+case class CometLocalTableScanExec(
+    originalPlan: LocalTableScanExec,
+    @transient rows: Seq[InternalRow],
+    override val output: Seq[Attribute])
+    extends CometExec
+    with LeafExecNode {
+
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+
+  @transient private lazy val unsafeRows: Array[InternalRow] = {
+    if (rows.isEmpty) {
+      Array.empty
+    } else {
+      val proj = UnsafeProjection.create(output, output)
+      rows.map(r => proj(r).copy()).toArray
+    }
+  }
+
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    if (rows.isEmpty) {
+      sparkContext.emptyRDD
+    } else {
+      val numSlices = math.min(unsafeRows.length, 
session.leafNodeDefaultParallelism)
+      sparkContext.parallelize(unsafeRows, numSlices)
+    }
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numInputRows = longMetric("numOutputRows")
+    val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
+    val timeZoneId = conf.sessionLocalTimeZone
+    rdd.mapPartitionsInternal { sparkBatches =>
+      val context = TaskContext.get()
+      val batches = CometArrowConverters.rowToArrowBatchIter(
+        sparkBatches,
+        originalPlan.schema,
+        maxRecordsPerBatch,
+        timeZoneId,
+        context)
+      createMetricsIterator(batches, numInputRows)
+    }
+  }
+
+  override protected def stringArgs: Iterator[Any] = {
+    if (rows.isEmpty) {
+      Iterator("<empty>", output)
+    } else {
+      Iterator(output)
+    }
+  }
+
+  override def supportsColumnar: Boolean = true
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: CometLocalTableScanExec =>
+        this.originalPlan == other.originalPlan &&
+        this.schema == other.schema &&
+        this.output == other.output
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int = Objects.hashCode(originalPlan, 
originalPlan.schema, output)
+}
+
+object CometLocalTableScanExec {
+
+  private def createMetricsIterator(
+      it: Iterator[ColumnarBatch],
+      numInputRows: SQLMetric): Iterator[ColumnarBatch] = {
+    new Iterator[ColumnarBatch] {
+      override def hasNext: Boolean = it.hasNext
+
+      override def next(): ColumnarBatch = {
+        val batch = it.next()
+        numInputRows.add(batch.numRows())
+        batch
+      }
+    }
+  }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index de6892638..d7a743eb2 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -380,7 +380,7 @@ abstract class CometNativeExec extends CometExec {
           _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: 
CometShuffleExchangeExec |
           _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: 
CometCoalesceExec |
           _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: 
BroadcastQueryStageExec |
-          _: CometSparkToColumnarExec =>
+          _: CometSparkToColumnarExec | _: CometLocalTableScanExec =>
         func(plan)
       case _: CometPlan =>
         // Other Comet operators, continue to traverse the tree.
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index fb8af7efb..aadd1adfd 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -2105,6 +2105,30 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("LocalTableScanExec spark fallback") {
+    withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") {
+      val df = Seq.range(0, 10).toDF("id")
+      checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled")
+    }
+  }
+
+  test("LocalTableScanExec with filter") {
+    withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+      val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5)
+      checkSparkAnswerAndOperator(df)
+    }
+  }
+
+  test("LocalTableScanExec with groupBy") {
+    withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+      val df = (Seq.range(0, 10) ++ Seq.range(0, 20))
+        .toDF("id")
+        .groupBy(col("id"))
+        .agg(count("*"))
+      checkSparkAnswerAndOperator(df)
+    }
+  }
+
 }
 
 case class BucketedTableTestSpec(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to