This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7d46ff245 chore: Add `scanImpl` attribute to `CometScanExec` (#1746)
7d46ff245 is described below
commit 7d46ff245c3d95fc9460db56e52c499588689123
Author: Andy Grove <[email protected]>
AuthorDate: Tue May 20 11:58:54 2025 -0600
chore: Add `scanImpl` attribute to `CometScanExec` (#1746)
---
.github/workflows/miri.yml | 2 +-
.../main/scala/org/apache/comet/CometConf.scala | 5 --
dev/diffs/3.4.3.diff | 2 +-
dev/diffs/3.5.4.diff | 2 +-
dev/diffs/3.5.5.diff | 2 +-
dev/diffs/4.0.0-preview1.diff | 2 +-
.../comet/parquet/CometParquetFileFormat.scala | 9 ++--
.../CometParquetPartitionReaderFactory.scala | 12 +----
.../apache/comet/parquet/CometParquetScan.scala | 1 +
.../org/apache/comet/rules/CometExecRule.scala | 5 +-
.../org/apache/comet/rules/CometScanRule.scala | 44 +++++++++++-----
.../org/apache/comet/serde/QueryPlanSerde.scala | 23 +++++++--
.../spark/sql/comet/CometNativeScanExec.scala | 24 ++-------
.../org/apache/spark/sql/comet/CometScanExec.scala | 58 +++++++++-------------
.../apache/comet/CometArrayExpressionSuite.scala | 2 +-
.../scala/org/apache/comet/CometCastSuite.scala | 2 +-
.../org/apache/comet/CometExpressionSuite.scala | 6 +--
.../org/apache/comet/CometFuzzTestSuite.scala | 18 +++----
.../org/apache/comet/exec/CometJoinSuite.scala | 2 +-
.../apache/comet/parquet/ParquetReadSuite.scala | 32 ++++++------
.../scala/org/apache/spark/sql/CometTestBase.scala | 2 +
21 files changed, 125 insertions(+), 130 deletions(-)
diff --git a/.github/workflows/miri.yml b/.github/workflows/miri.yml
index 2f67c2866..923598ee3 100644
--- a/.github/workflows/miri.yml
+++ b/.github/workflows/miri.yml
@@ -52,4 +52,4 @@ jobs:
- name: Test with Miri
run: |
cd native
- MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
+ MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --lib --bins
--tests --examples
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 3f35133a5..7b01d0fa1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -104,11 +104,6 @@ object CometConf extends ShimCometConf {
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
.toLowerCase(Locale.ROOT))
- def isExperimentalNativeScan: Boolean = COMET_NATIVE_SCAN_IMPL.get() match {
- case SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT => true
- case SCAN_NATIVE_COMET => false
- }
-
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index eecd8cd02..03353ed2a 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -961,7 +961,7 @@ index 75eabcb96f2..36e3318ad7e 100644
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _))))
=>
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff
index d9804b887..d4f8cdd37 100644
--- a/dev/diffs/3.5.4.diff
+++ b/dev/diffs/3.5.4.diff
@@ -1092,7 +1092,7 @@ index 260c992f1ae..b9d8e22337c 100644
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _))))
=>
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 4270037b9..1d5198f38 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -963,7 +963,7 @@ index 04702201f82..6cc2b01b7f3 100644
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _))))
=>
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 3efb4b2b7..a1c6a1422 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -1035,7 +1035,7 @@ index 68f14f13bbd..174636cefb5 100644
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _))))
=>
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 668865e08..871ac2704 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -56,7 +56,10 @@ import org.apache.comet.vector.CometVector
* in [[org.apache.comet.CometSparkSessionExtensions]]
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader
to read values.
*/
-class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport
with ShimSQLConf {
+class CometParquetFileFormat(scanImpl: String)
+ extends ParquetFileFormat
+ with MetricsSupport
+ with ShimSQLConf {
override def shortName(): String = "parquet"
override def toString: String = "CometParquet"
override def hashCode(): Int = getClass.hashCode()
@@ -100,8 +103,8 @@ class CometParquetFileFormat extends ParquetFileFormat with
MetricsSupport with
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
- val nativeIcebergCompat =
-
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+
+ val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
index 4dc099735..69cffdd15 100644
---
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
@@ -46,6 +46,7 @@ import org.apache.comet.{CometConf, CometRuntimeException}
import org.apache.comet.shims.ShimSQLConf
case class CometParquetPartitionReaderFactory(
+ usingDataFusionReader: Boolean,
@transient sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
readDataSchema: StructType,
@@ -71,17 +72,6 @@ case class CometParquetPartitionReaderFactory(
// Comet specific configurations
private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
- @transient private lazy val usingDataFusionReader: Boolean = {
- val conf = broadcastedConf.value.value
- conf.getBoolean(
- CometConf.COMET_NATIVE_SCAN_ENABLED.key,
- CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
- conf
- .get(
- CometConf.COMET_NATIVE_SCAN_IMPL.key,
- CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
- .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
- }
// This is only called at executor on a Broadcast variable, so we don't want
it to be
// materialized at driver.
@transient private lazy val preFetchEnabled = {
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
index e3cd33b41..b04d6ebc1 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
@@ -58,6 +58,7 @@ trait CometParquetScan extends FileScan with MetricsSupport {
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
CometParquetPartitionReaderFactory(
+ usingDataFusionReader = false, // this value is not used since this is
v2 scan
sqlConf,
broadcastedConf,
readDataSchema,
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index da4795fe8..d325043d8 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED,
COMET_NATIVE_SCAN_IMPL, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
+import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED,
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled,
isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan,
isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
@@ -154,8 +154,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
plan.transformUp {
// Fully native scan for V1
- case scan: CometScanExec
- if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION
=>
+ case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION =>
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 429a00291..eed1dfdba 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -25,14 +25,15 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
-import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec}
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, MapType,
ShortType, StructType}
-import org.apache.comet.CometConf
+import org.apache.comet.{CometConf, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded,
isCometScanEnabled, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
@@ -106,16 +107,11 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] {
return withInfos(scanExec, fallbackReasons.toSet)
}
- val (schemaSupported, partitionSchemaSupported) = scanImpl match {
- case CometConf.SCAN_NATIVE_DATAFUSION =>
- (
- CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema,
fallbackReasons),
- CometNativeScanExec.isSchemaSupported(r.partitionSchema,
fallbackReasons))
- case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
- (
- CometScanExec.isSchemaSupported(scanExec.requiredSchema,
fallbackReasons),
- CometScanExec.isSchemaSupported(r.partitionSchema,
fallbackReasons))
- }
+ val typeChecker = new CometScanTypeChecker(scanImpl)
+ val schemaSupported =
+ typeChecker.isSchemaSupported(scanExec.requiredSchema,
fallbackReasons)
+ val partitionSchemaSupported =
+ typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
if (!schemaSupported) {
fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema}
for $scanImpl"
@@ -125,7 +121,9 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] {
}
if (schemaSupported && partitionSchemaSupported) {
- CometScanExec(scanExec, session)
+ // this is confusing, but we always insert a CometScanExec here,
which may replaced
+ // with a CometNativeExec when CometExecRule runs, depending on the
scanImpl value.
+ CometScanExec(scanExec, session, scanImpl)
} else {
withInfos(scanExec, fallbackReasons.toSet)
}
@@ -201,3 +199,23 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] {
}
}
+
+case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {
+ override def isTypeSupported(
+ dt: DataType,
+ name: String,
+ fallbackReasons: ListBuffer[String]): Boolean = {
+ dt match {
+ case ByteType | ShortType
+ if scanImpl != CometConf.SCAN_NATIVE_COMET &&
+ !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
+ fallbackReasons += s"$scanImpl scan cannot read $dt when " +
+ s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false.
${CometConf.COMPAT_GUIDE}."
+ false
+ case _: StructType | _: ArrayType | _: MapType if scanImpl ==
CometConf.SCAN_NATIVE_COMET =>
+ false
+ case _ =>
+ super.isTypeSupported(dt, name, fallbackReasons)
+ }
+ }
+}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 79a6c16c5..ad9683918 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2282,8 +2282,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
op match {
// Fully native scan for V1
- case scan: CometScanExec
- if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ==
CometConf.SCAN_NATIVE_DATAFUSION =>
+ case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION =>
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
nativeScanBuilder.setSource(op.simpleStringWithNodeId())
@@ -2376,12 +2375,26 @@ object QueryPlanSerde extends Logging with
CometExprShim {
val cond = exprToProto(condition, child.output)
if (cond.isDefined && childOp.nonEmpty) {
+ // We need to determine whether to use DataFusion's FilterExec or
Comet's
+ // FilterExec. The difference is that DataFusion's implementation
will sometimes pass
+ // batches through whereas the Comet implementation guarantees that
a copy is always
+ // made, which is critical when using `native_comet` scans due to
buffer re-use
+
+ // TODO this could be optimized more to stop walking the tree on
hitting
+ // certain operators such as join or aggregate which will copy
batches
+ def containsNativeCometScan(plan: SparkPlan): Boolean = {
+ plan match {
+ case w: CometScanWrapper =>
containsNativeCometScan(w.originalPlan)
+ case scan: CometScanExec => scan.scanImpl ==
CometConf.SCAN_NATIVE_COMET
+ case _: CometNativeScanExec => false
+ case _ => plan.children.exists(containsNativeCometScan)
+ }
+ }
+
val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
- .setUseDatafusionFilter(
- CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_DATAFUSION ||
- CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+ .setUseDatafusionFilter(!containsNativeCometScan(op))
Some(result.setFilter(filterBuilder).build())
} else {
withInfo(op, condition, child)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 922b75e78..92b2e6a88 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -19,7 +19,6 @@
package org.apache.spark.sql.comet
-import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
@@ -36,12 +35,12 @@ import org.apache.spark.util.collection._
import com.google.common.base.Objects
-import org.apache.comet.{CometConf, DataTypeSupport}
+import org.apache.comet.CometConf
import org.apache.comet.parquet.CometParquetFileFormat
import org.apache.comet.serde.OperatorOuterClass.Operator
/**
- * Comet fully native scan node for DataSource V1.
+ * Comet fully native scan node for DataSource V1 that delegates to
DataFusion's DataSourceExec.
*/
case class CometNativeScanExec(
override val nativeOp: Operator,
@@ -184,7 +183,7 @@ case class CometNativeScanExec(
override def inputRDDs(): Seq[RDD[InternalRow]] = originalPlan.inputRDDs()
}
-object CometNativeScanExec extends DataTypeSupport {
+object CometNativeScanExec {
def apply(
nativeOp: Operator,
scanExec: FileSourceScanExec,
@@ -206,7 +205,8 @@ object CometNativeScanExec extends DataTypeSupport {
// https://github.com/apache/arrow-datafusion-comet/issues/190
def transform(arg: Any): AnyRef = arg match {
case _: HadoopFsRelation =>
- scanExec.relation.copy(fileFormat = new
CometParquetFileFormat)(session)
+ scanExec.relation.copy(fileFormat =
+ new
CometParquetFileFormat(CometConf.SCAN_NATIVE_DATAFUSION))(session)
case other: AnyRef => other
case null => null
}
@@ -229,18 +229,4 @@ object CometNativeScanExec extends DataTypeSupport {
scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
batchScanExec
}
-
- override def isTypeSupported(
- dt: DataType,
- name: String,
- fallbackReasons: ListBuffer[String]): Boolean = {
- dt match {
- case ByteType | ShortType if
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
- fallbackReasons += s"${CometConf.SCAN_NATIVE_DATAFUSION} scan cannot
read $dt when " +
- s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false.
${CometConf.COMPAT_GUIDE}."
- false
- case _ =>
- super.isTypeSupported(dt, name, fallbackReasons)
- }
- }
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index a4d7f4ec4..8bba2d863 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.comet
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.HashMap
import scala.concurrent.duration.NANOSECONDS
import scala.reflect.ClassTag
@@ -44,14 +44,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection._
-import org.apache.comet.{CometConf, DataTypeSupport, MetricsSupport}
+import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.parquet.{CometParquetFileFormat,
CometParquetPartitionReaderFactory}
/**
* Comet physical scan node for DataSource V1. Most of the code here follow
Spark's
- * [[FileSourceScanExec]],
+ * [[FileSourceScanExec]].
+ *
+ * This is a hybrid scan where the native plan will contain a `ScanExec` that
reads batches of
+ * data from the JVM via JNI. The ultimate source of data may be a JVM
implementation such as
+ * Spark readers, or could be the `native_comet` or `native_iceberg_compat`
native scans.
+ *
+ * Note that scanImpl can only be `native_datafusion` after CometScanRule runs
and before
+ * CometExecRule runs. It will never be set to `native_datafusion` at
execution time
*/
case class CometScanExec(
+ scanImpl: String,
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
@@ -414,16 +422,8 @@ case class CometScanExec(
readFile: (PartitionedFile) => Iterator[InternalRow],
partitions: Seq[FilePartition]): RDD[InternalRow] = {
val hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
- val usingDataFusionReader: Boolean = {
- hadoopConf.getBoolean(
- CometConf.COMET_NATIVE_SCAN_ENABLED.key,
- CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
- hadoopConf
- .get(
- CometConf.COMET_NATIVE_SCAN_IMPL.key,
- CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
- .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
- }
+ val usingDataFusionReader: Boolean = scanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
+
val prefetchEnabled = hadoopConf.getBoolean(
CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
@@ -435,6 +435,7 @@ case class CometScanExec(
val broadcastedConf =
fsRelation.sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val partitionReaderFactory = CometParquetPartitionReaderFactory(
+ scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
sqlConf,
broadcastedConf,
requiredSchema,
@@ -461,6 +462,7 @@ case class CometScanExec(
override def doCanonicalize(): CometScanExec = {
CometScanExec(
+ scanImpl,
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
@@ -476,9 +478,12 @@ case class CometScanExec(
}
}
-object CometScanExec extends DataTypeSupport {
+object CometScanExec {
- def apply(scanExec: FileSourceScanExec, session: SparkSession):
CometScanExec = {
+ def apply(
+ scanExec: FileSourceScanExec,
+ session: SparkSession,
+ scanImpl: String): CometScanExec = {
// TreeNode.mapProductIterator is protected method.
def mapProductIterator[B: ClassTag](product: Product, f: Any => B):
Array[B] = {
val arr = Array.ofDim[B](product.productArity)
@@ -496,13 +501,14 @@ object CometScanExec extends DataTypeSupport {
// https://github.com/apache/arrow-datafusion-comet/issues/190
def transform(arg: Any): AnyRef = arg match {
case _: HadoopFsRelation =>
- scanExec.relation.copy(fileFormat = new
CometParquetFileFormat)(session)
+ scanExec.relation.copy(fileFormat = new
CometParquetFileFormat(scanImpl))(session)
case other: AnyRef => other
case null => null
}
- val newArgs = mapProductIterator(scanExec, transform(_))
+ val newArgs = mapProductIterator(scanExec, transform)
val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
val batchScanExec = CometScanExec(
+ scanImpl,
wrapped.relation,
wrapped.output,
wrapped.requiredSchema,
@@ -523,22 +529,4 @@ object CometScanExec extends DataTypeSupport {
fileFormat.getClass().equals(classOf[ParquetFileFormat])
}
- override def isTypeSupported(
- dt: DataType,
- name: String,
- fallbackReasons: ListBuffer[String]): Boolean = {
- dt match {
- case ByteType | ShortType
- if CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
- !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
- fallbackReasons += s"${CometConf.SCAN_NATIVE_ICEBERG_COMPAT} scan
cannot read $dt when " +
- s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false.
${CometConf.COMPAT_GUIDE}."
- false
- case _: StructType | _: ArrayType | _: MapType
- if CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
- false
- case _ =>
- super.isTypeSupported(dt, name, fallbackReasons)
- }
- }
}
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index c58f1a14f..4803f4922 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -235,7 +235,7 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
test("array_intersect") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index ec01c16fc..11e44251b 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -932,7 +932,7 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("cast StructType to StringType") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 35f8d8fc0..43fbe57c5 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1266,7 +1266,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("round") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1496,7 +1496,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("hex") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "hex.parquet")
@@ -2725,7 +2725,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test integral divide") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path1 = new Path(dir.toURI.toString, "test1.parquet")
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 4af73ce01..05901339b 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -81,7 +81,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1"
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
checkSparkAnswerAndOperator(sql)
} else {
checkSparkAnswer(sql)
@@ -92,7 +92,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1 LIMIT 500"
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
checkSparkAnswerAndOperator(sql)
} else {
checkSparkAnswer(sql)
@@ -106,7 +106,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val sql = s"SELECT $col FROM t1 ORDER BY $col"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -118,7 +118,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
for (col <- df.columns) {
val sql = s"SELECT count(distinct $col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -131,7 +131,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -143,7 +143,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
// cannot run fully natively due to range partitioning and sort
val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -156,7 +156,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
// cannot run fully native due to HashAggregate
val sql = s"SELECT min($col), max($col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -195,7 +195,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val df = spark.read.parquet(filename)
val df2 = df.repartition(8, df.col("c0")).sort("c1")
df2.collect()
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
val cometShuffles =
collectCometShuffleExchanges(df2.queryExecution.executedPlan)
assert(1 == cometShuffles.length)
}
@@ -209,7 +209,7 @@ class CometFuzzTestSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
// cannot run fully native due to HashAggregate
val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (CometConf.isExperimentalNativeScan) {
+ if (usingDataSourceExec) {
assert(2 == collectNativeScans(cometPlan).length)
}
}
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index e0a873dd1..d47b4e0c1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase {
test("HashJoin struct key") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
withSQLConf(
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 2fbad102d..334c468c5 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -48,8 +48,8 @@ import org.apache.spark.unsafe.types.UTF8String
import com.google.common.primitives.UnsignedLong
import org.apache.comet.CometConf
-import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
+import org.apache.comet.rules.CometScanTypeChecker
abstract class ParquetReadSuite extends CometTestBase {
import testImplicits._
@@ -88,8 +88,7 @@ abstract class ParquetReadSuite extends CometTestBase {
// for native iceberg compat, CometScanExec supports some types that
native_comet does not.
// note that native_datafusion does not use CometScanExec so we need not
include that in
// the check
- val usingNativeIcebergCompat =
- CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
+ val isDataFusionScan = usingDataSourceExec(conf)
Seq(
NullType -> false,
BooleanType -> true,
@@ -103,22 +102,22 @@ abstract class ParquetReadSuite extends CometTestBase {
StringType -> true,
// Timestamp here arbitrary for picking a concrete data type to from
ArrayType
// Any other type works
- ArrayType(TimestampType) -> usingNativeIcebergCompat,
+ ArrayType(TimestampType) -> isDataFusionScan,
StructType(
Seq(
StructField("f1", DecimalType.SYSTEM_DEFAULT),
- StructField("f2", StringType))) -> usingNativeIcebergCompat,
- MapType(keyType = LongType, valueType = DateType) ->
usingNativeIcebergCompat,
+ StructField("f2", StringType))) -> isDataFusionScan,
+ MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan,
StructType(
- Seq(
- StructField("f1", ByteType),
- StructField("f2", StringType))) -> usingNativeIcebergCompat,
- MapType(keyType = IntegerType, valueType = BinaryType) ->
usingNativeIcebergCompat)
+ Seq(StructField("f1", ByteType), StructField("f2", StringType))) ->
isDataFusionScan,
+ MapType(keyType = IntegerType, valueType = BinaryType) ->
isDataFusionScan)
.foreach { case (dt, expected) =>
val fallbackReasons = new ListBuffer[String]()
- assert(CometScanExec.isTypeSupported(dt, "", fallbackReasons) ==
expected)
+ assert(
+ CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
+ .isTypeSupported(dt, "", fallbackReasons) == expected)
// usingDataFusionParquetExec does not support CometBatchScanExec yet
- if (!usingDataSourceExec(conf)) {
+ if (!isDataFusionScan) {
assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons)
== expected)
}
}
@@ -131,8 +130,7 @@ abstract class ParquetReadSuite extends CometTestBase {
// Arrays support for iceberg compat native and for Parquet V1
val cometScanExecSupported =
- if
(sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) &&
this
- .isInstanceOf[ParquetReadV1Suite])
+ if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite])
Seq(true, true, true)
else Seq(true, false, false)
@@ -140,7 +138,9 @@ abstract class ParquetReadSuite extends CometTestBase {
val fallbackReasons = new ListBuffer[String]()
schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) =>
- assert(CometScanExec.isSchemaSupported(StructType(schema),
fallbackReasons) == expected)
+ assert(
+ CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
+ .isSchemaSupported(StructType(schema), fallbackReasons) == expected)
}
schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema,
expected) =>
@@ -354,7 +354,7 @@ abstract class ParquetReadSuite extends CometTestBase {
test("test multiple pages with different sizes and nulls") {
// https://github.com/apache/datafusion-comet/issues/1441
- assume(!CometConf.isExperimentalNativeScan)
+ assume(!usingDataSourceExec)
def makeRawParquetFile(
path: Path,
dictionaryEnabled: Boolean,
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index f69b94459..fbfdbcda6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -979,6 +979,8 @@ abstract class CometTestBase
writer.close()
}
+ def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get)
+
def usingDataSourceExec(conf: SQLConf): Boolean =
Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.SCAN_NATIVE_DATAFUSION).contains(
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]