This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 9b91c25e8 Feat: Add CometLocalTableScanExec operator (#2735)
9b91c25e8 is described below
commit 9b91c25e8d3391ab673054d305cb97d063ac270b
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Sun Nov 9 07:55:21 2025 -0800
Feat: Add CometLocalTableScanExec operator (#2735)
---
.../main/scala/org/apache/comet/CometConf.scala | 2 +
docs/source/user-guide/latest/configs.md | 1 +
.../org/apache/comet/rules/CometExecRule.scala | 13 +++
.../org/apache/comet/serde/QueryPlanSerde.scala | 6 +-
.../CometLocalTableScan.scala} | 36 +++----
.../comet/serde/{ => operator}/CometProject.scala | 3 +-
.../comet/serde/{ => operator}/CometSort.scala | 3 +-
.../spark/sql/comet/CometLocalTableScanExec.scala | 119 +++++++++++++++++++++
.../org/apache/spark/sql/comet/operators.scala | 2 +-
.../org/apache/comet/exec/CometExecSuite.scala | 24 +++++
10 files changed, 185 insertions(+), 24 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 60fd1940b..4f868f496 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -251,6 +251,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
+ val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
+ createExecEnabledConfig("localTableScan", defaultValue = false)
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED:
ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 1cc05dfc7..fd232874e 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -149,6 +149,7 @@ These settings can be used to determine which parts of the
plan are accelerated
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by
default. | true |
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default.
| true |
| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by
default. | true |
+| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan
by default. | false |
| `spark.comet.exec.project.enabled` | Whether to enable project by default. |
true |
| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin
by default. | true |
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index 873a1c55c..708a48d6a 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -534,6 +534,19 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
s
}
+ case op: LocalTableScanExec =>
+ if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
+ QueryPlanSerde
+ .operator2Proto(op)
+ .map { nativeOp =>
+ val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
+ CometScanWrapper(nativeOp, cometOp)
+ }
+ .getOrElse(op)
+ } else {
+ withInfo(op, "LocalTableScan is not enabled")
+ }
+
case op =>
op match {
case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec
|
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 3e0e837c9..909f6c175 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -47,6 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
import org.apache.comet.serde.Types.DataType._
import org.apache.comet.serde.literals.CometLiteral
+import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject,
CometSort, CometSortOrder}
import org.apache.comet.shims.CometExprShim
/**
@@ -58,7 +59,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
* Mapping of Spark operator class to Comet operator handler.
*/
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
- Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort)
+ Map(
+ classOf[ProjectExec] -> CometProject,
+ classOf[SortExec] -> CometSort,
+ classOf[LocalTableScanExec] -> CometLocalTableScan)
private val arrayExpressions: Map[Class[_ <: Expression],
CometExpressionSerde[_]] = Map(
classOf[ArrayAppend] -> CometArrayAppend,
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
similarity index 54%
copy from spark/src/main/scala/org/apache/comet/serde/CometProject.scala
copy to
spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
index 651aa8fef..e3e8538cf 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala
@@ -17,36 +17,32 @@
* under the License.
*/
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
import scala.jdk.CollectionConverters._
-import org.apache.spark.sql.execution.ProjectExec
+import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.exprToProto
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
-object CometProject extends CometOperatorSerde[ProjectExec] {
+object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] {
- override def enabledConfig: Option[ConfigEntry[Boolean]] =
- Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
+ override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
+ CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
override def convert(
- op: ProjectExec,
+ op: LocalTableScanExec,
builder: Operator.Builder,
- childOp: Operator*): Option[OperatorOuterClass.Operator] = {
- val exprs = op.projectList.map(exprToProto(_, op.child.output))
-
- if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
- val projectBuilder = OperatorOuterClass.Projection
- .newBuilder()
- .addAllProjectList(exprs.map(_.get).asJava)
- Some(builder.setProjection(projectBuilder).build())
- } else {
- withInfo(op, op.projectList: _*)
- None
- }
+ childOp: Operator*): Option[Operator] = {
+ val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
+ val scanBuilder = OperatorOuterClass.Scan
+ .newBuilder()
+ .setSource(op.getClass.getSimpleName)
+ .addAllFields(scanTypes.asJava)
+ .setArrowFfiSafe(false)
+ Some(builder.setScan(scanBuilder).build())
}
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
similarity index 94%
rename from spark/src/main/scala/org/apache/comet/serde/CometProject.scala
rename to
spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
index 651aa8fef..4ba02945d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
import scala.jdk.CollectionConverters._
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.ProjectExec
import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
similarity index 95%
rename from spark/src/main/scala/org/apache/comet/serde/CometSort.scala
rename to spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
index 4a1063458..39a1c5565 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.comet.serde
+package org.apache.comet.serde.operator
import scala.jdk.CollectionConverters._
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType,
DataTypes, MapType, Stru
import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde,
Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto,
exprToProtoInternal, supportedSortType}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
new file mode 100644
index 000000000..611d367f3
--- /dev/null
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator
+import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
+import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import com.google.common.base.Objects
+
+import org.apache.comet.CometConf
+
+case class CometLocalTableScanExec(
+ originalPlan: LocalTableScanExec,
+ @transient rows: Seq[InternalRow],
+ override val output: Seq[Attribute])
+ extends CometExec
+ with LeafExecNode {
+
+ override lazy val metrics: Map[String, SQLMetric] = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+
+ @transient private lazy val unsafeRows: Array[InternalRow] = {
+ if (rows.isEmpty) {
+ Array.empty
+ } else {
+ val proj = UnsafeProjection.create(output, output)
+ rows.map(r => proj(r).copy()).toArray
+ }
+ }
+
+ @transient private lazy val rdd: RDD[InternalRow] = {
+ if (rows.isEmpty) {
+ sparkContext.emptyRDD
+ } else {
+ val numSlices = math.min(unsafeRows.length,
session.leafNodeDefaultParallelism)
+ sparkContext.parallelize(unsafeRows, numSlices)
+ }
+ }
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numInputRows = longMetric("numOutputRows")
+ val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
+ val timeZoneId = conf.sessionLocalTimeZone
+ rdd.mapPartitionsInternal { sparkBatches =>
+ val context = TaskContext.get()
+ val batches = CometArrowConverters.rowToArrowBatchIter(
+ sparkBatches,
+ originalPlan.schema,
+ maxRecordsPerBatch,
+ timeZoneId,
+ context)
+ createMetricsIterator(batches, numInputRows)
+ }
+ }
+
+ override protected def stringArgs: Iterator[Any] = {
+ if (rows.isEmpty) {
+ Iterator("<empty>", output)
+ } else {
+ Iterator(output)
+ }
+ }
+
+ override def supportsColumnar: Boolean = true
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: CometLocalTableScanExec =>
+ this.originalPlan == other.originalPlan &&
+ this.schema == other.schema &&
+ this.output == other.output
+ case _ =>
+ false
+ }
+ }
+
+ override def hashCode(): Int = Objects.hashCode(originalPlan,
originalPlan.schema, output)
+}
+
+object CometLocalTableScanExec {
+
+ private def createMetricsIterator(
+ it: Iterator[ColumnarBatch],
+ numInputRows: SQLMetric): Iterator[ColumnarBatch] = {
+ new Iterator[ColumnarBatch] {
+ override def hasNext: Boolean = it.hasNext
+
+ override def next(): ColumnarBatch = {
+ val batch = it.next()
+ numInputRows.add(batch.numRows())
+ batch
+ }
+ }
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index de6892638..d7a743eb2 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -380,7 +380,7 @@ abstract class CometNativeExec extends CometExec {
_: ShuffleQueryStageExec | _: AQEShuffleReadExec | _:
CometShuffleExchangeExec |
_: CometUnionExec | _: CometTakeOrderedAndProjectExec | _:
CometCoalesceExec |
_: ReusedExchangeExec | _: CometBroadcastExchangeExec | _:
BroadcastQueryStageExec |
- _: CometSparkToColumnarExec =>
+ _: CometSparkToColumnarExec | _: CometLocalTableScanExec =>
func(plan)
case _: CometPlan =>
// Other Comet operators, continue to traverse the tree.
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index fb8af7efb..aadd1adfd 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -2105,6 +2105,30 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("LocalTableScanExec spark fallback") {
+ withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") {
+ val df = Seq.range(0, 10).toDF("id")
+ checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled")
+ }
+ }
+
+ test("LocalTableScanExec with filter") {
+ withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+ val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5)
+ checkSparkAnswerAndOperator(df)
+ }
+ }
+
+ test("LocalTableScanExec with groupBy") {
+ withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+ val df = (Seq.range(0, 10) ++ Seq.range(0, 20))
+ .toDF("id")
+ .groupBy(col("id"))
+ .agg(count("*"))
+ checkSparkAnswerAndOperator(df)
+ }
+ }
+
}
case class BucketedTableTestSpec(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]