weiting-chen commented on code in PR #12077:
URL: https://github.com/apache/gluten/pull/12077#discussion_r3226413051


##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * Velox-backend implementation of RDDScanTransformer.
+ *
+ * Converts an RDD[InternalRow] into columnar batches using Velox's native 
row-to-columnar
+ * conversion (same JNI path as RowToVeloxColumnarExec).
+ */
+case class VeloxRDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    name: String,
+    override val outputPartitioning: Partitioning,
+    override val outputOrdering: Seq[SortOrder]
+) extends RDDScanTransformer(outputAttributes, outputPartitioning, 
outputOrdering) {
+

Review Comment:
   **PR description contradicts validation logic for complex types**
   
   **Problem:** The PR description states *"rejects complex types (ARRAY, MAP, 
STRUCT)"* but `doValidateInternal()` explicitly **accepts** these types. The 
code is correct — Velox does support complex types via 
`UnsafeRowFast::deserialize`. The PR description should be updated to avoid 
misleading reviewers.
   
   **Evidence:**
   ```scala
   case _: org.apache.spark.sql.types.ArrayType =>
   case _: org.apache.spark.sql.types.MapType =>
   case _: org.apache.spark.sql.types.StructType =>
   ```
   These cases fall through to `ValidationResult.succeeded`, meaning complex 
types are accepted.
   
   **Suggested Fix:** Update the PR description to remove the claim that 
complex types are rejected, e.g.:
   > *Supports all Velox-compatible types including complex types (Array, Map, 
Struct). Rejects only truly unsupported types (e.g., CalendarIntervalType) with 
clean fallback to vanilla Spark.*



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * Velox-backend implementation of RDDScanTransformer.
+ *
+ * Converts an RDD[InternalRow] into columnar batches using Velox's native 
row-to-columnar
+ * conversion (same JNI path as RowToVeloxColumnarExec).
+ */
+case class VeloxRDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    name: String,
+    override val outputPartitioning: Partitioning,
+    override val outputOrdering: Seq[SortOrder]

Review Comment:
   **Validation does not recurse into complex type element types**
   
   **Problem:** The type allowlist checks top-level types only. An 
`ArrayType(UnsupportedType)` or `MapType(StringType, UnsupportedType)` would 
pass validation but could fail at native execution time. The CH backend avoids 
this by delegating to `ConverterUtils.getTypeNode()` which recursively 
validates.
   
   **Evidence:**
   ```scala
   case _: org.apache.spark.sql.types.ArrayType =>   // passes any ArrayType, 
no element check
   case _: org.apache.spark.sql.types.MapType =>      // passes any MapType, no 
key/value check
   case _: org.apache.spark.sql.types.StructType =>   // passes any StructType, 
no field check
   ```
   
   **Suggested Fix:**
   ```scala
   case a: org.apache.spark.sql.types.ArrayType =>
     validateType(a.elementType)
   case m: org.apache.spark.sql.types.MapType =>
     validateType(m.keyType)
     validateType(m.valueType)
   case s: org.apache.spark.sql.types.StructType =>
     s.fields.foreach(f => validateType(f.dataType))
   ```
   Alternatively, delegate to `VeloxValidatorApi` for centralized type 
validation.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * Velox-backend implementation of RDDScanTransformer.
+ *
+ * Converts an RDD[InternalRow] into columnar batches using Velox's native 
row-to-columnar
+ * conversion (same JNI path as RowToVeloxColumnarExec).
+ */
+case class VeloxRDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    name: String,
+    override val outputPartitioning: Partitioning,
+    override val outputOrdering: Seq[SortOrder]
+) extends RDDScanTransformer(outputAttributes, outputPartitioning, 
outputOrdering) {
+
+  override protected def doValidateInternal(): ValidationResult = {
+    // Validate that all output types are supported by Velox's row-to-columnar
+    // converter. This mirrors the type allowlist in VeloxColumnarToRowExec.
+    for (field <- schema.fields) {
+      field.dataType match {
+        case _: org.apache.spark.sql.types.BooleanType =>
+        case _: org.apache.spark.sql.types.ByteType =>
+        case _: org.apache.spark.sql.types.ShortType =>
+        case _: org.apache.spark.sql.types.IntegerType =>
+        case _: org.apache.spark.sql.types.LongType =>
+        case _: org.apache.spark.sql.types.FloatType =>
+        case _: org.apache.spark.sql.types.DoubleType =>
+        case _: org.apache.spark.sql.types.StringType =>
+        case _: org.apache.spark.sql.types.TimestampType =>
+        case _: org.apache.spark.sql.types.DateType =>
+        case _: org.apache.spark.sql.types.BinaryType =>

Review Comment:
   **No SQLMetrics propagation — Spark UI won't show conversion metrics**
   
   **Problem:** The 4-param overload of `toColumnarBatchIterator` creates 
throwaway `SQLMetric` instances not attached to this plan's metrics map. As a 
result, `numInputRows`, `numOutputBatches`, and `convertTime` won't appear in 
the Spark UI for this operator, making production debugging harder.
   
   **Evidence:**
   ```scala
   // Current (4-param overload creates throwaway metrics):
   RowToVeloxColumnarExec.toColumnarBatchIterator(iter, localSchema, batchSize, 
batchBytes)
   ```
   
   **Suggested Fix:** Define plan-level metrics and use the 7-param overload:
   ```scala
   override lazy val metrics = Map(
     "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
     "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
     "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
convert"))
   
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
     val convertTime = longMetric("convertTime")
     val localSchema = this.schema
     val batchSize = GlutenConfig.get.maxBatchSize
     val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes
     rdd.mapPartitions { iter =>
       RowToVeloxColumnarExec.toColumnarBatchIterator(
         iter, localSchema, numInputRows, numOutputBatches, convertTime, 
batchSize, batchBytes)
     }
   }
   ```



##########
backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.classic.ClassicDataset
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.types._
+
+class VeloxRDDScanSuite extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
+
+  override protected val resourcePath: String = "/tpch-data-parquet"
+  override protected val fileFormat: String = "parquet"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.ansi.enabled", "false")
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    createTPCHNotNullTables()
+  }
+
+  test("basic RDDScanExec is replaced by VeloxRDDScanTransformer") {
+    val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 10")
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+    val cnt = collect(df.queryExecution.executedPlan) { case _: 
VeloxRDDScanTransformer => true }
+    assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan")
+  }
+
+  test("RDDScan with string and numeric types") {
+    val data = spark.sql("""SELECT l_returnflag, l_linestatus, l_quantity, 
l_extendedprice
+                           |FROM lineitem LIMIT 20""".stripMargin)
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+    val cnt = collect(df.queryExecution.executedPlan) { case _: 
VeloxRDDScanTransformer => true }
+    assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan")
+  }
+
+  test("RDDScan with aggregation downstream") {
+    val query =
+      """SELECT l_returnflag, sum(l_quantity) AS sum_qty
+        |FROM lineitem
+        |WHERE l_shipdate <= date'1998-09-02'
+        |GROUP BY l_returnflag""".stripMargin
+    val data = spark.sql(query)
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+  }
+
+  test("RDDScan with empty RDD") {
+    val data = spark.sql("SELECT l_orderkey FROM lineitem WHERE 1 = 0")
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+    assert(df.count() == 0)
+  }
+
+  test("RDDScan preserves data correctness with multiple re-reads") {
+    val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 50")
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    // Read twice to verify idempotency
+    checkAnswer(df, expectedAnswer)
+    checkAnswer(df, expectedAnswer)
+  }
+
+  test("RDDScan with null values") {
+    val rdd = spark.sparkContext.parallelize(
+      Seq(
+        Row(1, "a", null),
+        Row(null, "b", 2.0),
+        Row(3, null, 3.0)
+      ))
+    val schema = StructType(
+      Seq(
+        StructField("id", IntegerType, nullable = true),
+        StructField("name", StringType, nullable = true),
+        StructField("value", DoubleType, nullable = true)
+      ))
+    val data = spark.createDataFrame(rdd, schema)
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+  }
+
+  test("RDDScan with all supported primitive types") {
+    val rdd = spark.sparkContext.parallelize(
+      Seq(
+        Row(
+          true,
+          1.toByte,
+          2.toShort,
+          3,
+          4L,
+          5.0f,
+          6.0,
+          "hello",
+          java.sql.Date.valueOf("2024-01-01"),
+          java.sql.Timestamp.valueOf("2024-01-01 12:00:00"),
+          Array[Byte](1, 2, 3),
+          BigDecimal("123.45").underlying()
+        )
+      ))
+    val schema = StructType(
+      Seq(
+        StructField("bool", BooleanType),
+        StructField("byte", ByteType),
+        StructField("short", ShortType),
+        StructField("int", IntegerType),
+        StructField("long", LongType),
+        StructField("float", FloatType),
+        StructField("double", DoubleType),
+        StructField("string", StringType),
+        StructField("date", DateType),
+        StructField("timestamp", TimestampType),
+        StructField("binary", BinaryType),
+        StructField("decimal", DecimalType(10, 2))
+      ))
+    val data = spark.createDataFrame(rdd, schema)
+    val expectedAnswer = data.collect()
+
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+    val df = ClassicDataset.ofRows(spark, node).toDF()
+
+    checkAnswer(df, expectedAnswer)
+  }
+}

Review Comment:
   **Missing test coverage for complex types and unsupported-type fallback**
   
   **Problem:** The 7 tests cover primitives, nulls, empty RDD, and aggregation 
— but two important scenarios are untested:
   1. **Complex types** (ArrayType, MapType, StructType) — validation 
explicitly accepts them, but no test exercises the full row-to-columnar JNI 
path with nested data.
   2. **Unsupported type fallback** — no test verifies that a truly unsupported 
type (e.g., `CalendarIntervalType`) triggers graceful fallback to vanilla Spark 
instead of a runtime crash.
   
   **Suggested Fix:** Add at least these two tests:
   ```scala
   test("RDDScan with array type") {
     val rdd = spark.sparkContext.parallelize(Seq(Row(Seq(1, 2, 3)), Row(Seq(4, 
5))))
     val schema = StructType(Seq(StructField("arr", ArrayType(IntegerType))))
     val data = spark.createDataFrame(rdd, schema)
     val expectedAnswer = data.collect()
     val node = LogicalRDD.fromDataset(
       rdd = data.queryExecution.toRdd, originDataset = data, isStreaming = 
false)
     val df = ClassicDataset.ofRows(spark, node).toDF()
     checkAnswer(df, expectedAnswer)
   }
   
   test("RDDScan falls back for unsupported types") {
     // Create RDD with CalendarIntervalType or another unsupported type
     // Verify plan does NOT contain VeloxRDDScanTransformer (i.e., fallback 
occurred)
   }
   ```



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * Velox-backend implementation of RDDScanTransformer.
+ *
+ * Converts an RDD[InternalRow] into columnar batches using Velox's native 
row-to-columnar
+ * conversion (same JNI path as RowToVeloxColumnarExec).
+ */
+case class VeloxRDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    name: String,
+    override val outputPartitioning: Partitioning,
+    override val outputOrdering: Seq[SortOrder]
+) extends RDDScanTransformer(outputAttributes, outputPartitioning, 
outputOrdering) {
+
+  override protected def doValidateInternal(): ValidationResult = {
+    // Validate that all output types are supported by Velox's row-to-columnar
+    // converter. This mirrors the type allowlist in VeloxColumnarToRowExec.
+    for (field <- schema.fields) {
+      field.dataType match {
+        case _: org.apache.spark.sql.types.BooleanType =>
+        case _: org.apache.spark.sql.types.ByteType =>
+        case _: org.apache.spark.sql.types.ShortType =>
+        case _: org.apache.spark.sql.types.IntegerType =>
+        case _: org.apache.spark.sql.types.LongType =>
+        case _: org.apache.spark.sql.types.FloatType =>
+        case _: org.apache.spark.sql.types.DoubleType =>
+        case _: org.apache.spark.sql.types.StringType =>
+        case _: org.apache.spark.sql.types.TimestampType =>
+        case _: org.apache.spark.sql.types.DateType =>
+        case _: org.apache.spark.sql.types.BinaryType =>
+        case _: org.apache.spark.sql.types.DecimalType =>
+        case _: org.apache.spark.sql.types.ArrayType =>
+        case _: org.apache.spark.sql.types.MapType =>
+        case _: org.apache.spark.sql.types.StructType =>
+        case org.apache.spark.sql.types.YearMonthIntervalType.DEFAULT =>
+        case _: org.apache.spark.sql.types.NullType =>
+        case dt
+            if !VeloxConfig.get.enableTimestampNtzValidation &&

Review Comment:
   **`withNewChildrenInternal` returns `this` instead of `copy()`**
   
   **Problem:** Returning `this` from a case class's `withNewChildrenInternal` 
breaks Spark's convention that tree transformations produce structurally new 
nodes. The CH backend returns `copy(...)` for the equivalent transformer. While 
this is functionally safe for a leaf node today, it's inconsistent with the 
project pattern.
   
   **Evidence:**
   ```scala
   // Velox (this PR):
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[SparkPlan]): SparkPlan =
     this
   
   // CH backend (CHRDDScanTransformer):
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[SparkPlan]): SparkPlan =
     copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering)
   ```
   
   **Suggested Fix:**
   ```scala
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[SparkPlan]): SparkPlan =
     copy()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to