weiting-chen commented on code in PR #12077: URL: https://github.com/apache/gluten/pull/12077#discussion_r3226413051
########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + Review Comment: **PR description contradicts validation logic for complex types** **Problem:** The PR description states *"rejects complex types (ARRAY, MAP, STRUCT)"* but `doValidateInternal()` explicitly **accepts** these types. The code is correct — Velox does support complex types via `UnsafeRowFast::deserialize`. The PR description should be updated to avoid misleading reviewers. **Evidence:** ```scala case _: org.apache.spark.sql.types.ArrayType => case _: org.apache.spark.sql.types.MapType => case _: org.apache.spark.sql.types.StructType => ``` These cases fall through to `ValidationResult.succeeded`, meaning complex types are accepted. **Suggested Fix:** Update the PR description to remove the claim that complex types are rejected, e.g.: > *Supports all Velox-compatible types including complex types (Array, Map, Struct). Rejects only truly unsupported types (e.g., CalendarIntervalType) with clean fallback to vanilla Spark.* ########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] Review Comment: **Validation does not recurse into complex type element types** **Problem:** The type allowlist checks top-level types only. An `ArrayType(UnsupportedType)` or `MapType(StringType, UnsupportedType)` would pass validation but could fail at native execution time. The CH backend avoids this by delegating to `ConverterUtils.getTypeNode()` which recursively validates. **Evidence:** ```scala case _: org.apache.spark.sql.types.ArrayType => // passes any ArrayType, no element check case _: org.apache.spark.sql.types.MapType => // passes any MapType, no key/value check case _: org.apache.spark.sql.types.StructType => // passes any StructType, no field check ``` **Suggested Fix:** ```scala case a: org.apache.spark.sql.types.ArrayType => validateType(a.elementType) case m: org.apache.spark.sql.types.MapType => validateType(m.keyType) validateType(m.valueType) case s: org.apache.spark.sql.types.StructType => s.fields.foreach(f => validateType(f.dataType)) ``` Alternatively, delegate to `VeloxValidatorApi` for centralized type validation. ########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + + override protected def doValidateInternal(): ValidationResult = { + // Validate that all output types are supported by Velox's row-to-columnar + // converter. This mirrors the type allowlist in VeloxColumnarToRowExec. + for (field <- schema.fields) { + field.dataType match { + case _: org.apache.spark.sql.types.BooleanType => + case _: org.apache.spark.sql.types.ByteType => + case _: org.apache.spark.sql.types.ShortType => + case _: org.apache.spark.sql.types.IntegerType => + case _: org.apache.spark.sql.types.LongType => + case _: org.apache.spark.sql.types.FloatType => + case _: org.apache.spark.sql.types.DoubleType => + case _: org.apache.spark.sql.types.StringType => + case _: org.apache.spark.sql.types.TimestampType => + case _: org.apache.spark.sql.types.DateType => + case _: org.apache.spark.sql.types.BinaryType => Review Comment: **No SQLMetrics propagation — Spark UI won't show conversion metrics** **Problem:** The 4-param overload of `toColumnarBatchIterator` creates throwaway `SQLMetric` instances not attached to this plan's metrics map. As a result, `numInputRows`, `numOutputBatches`, and `convertTime` won't appear in the Spark UI for this operator, making production debugging harder. **Evidence:** ```scala // Current (4-param overload creates throwaway metrics): RowToVeloxColumnarExec.toColumnarBatchIterator(iter, localSchema, batchSize, batchBytes) ``` **Suggested Fix:** Define plan-level metrics and use the 7-param overload: ```scala override lazy val metrics = Map( "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")) override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") val localSchema = this.schema val batchSize = GlutenConfig.get.maxBatchSize val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes rdd.mapPartitions { iter => RowToVeloxColumnarExec.toColumnarBatchIterator( iter, localSchema, numInputRows, numOutputBatches, convertTime, batchSize, batchBytes) } } ``` ########## backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.classic.ClassicDataset +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.types._ + +class VeloxRDDScanSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.ansi.enabled", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + createTPCHNotNullTables() + } + + test("basic RDDScanExec is replaced by VeloxRDDScanTransformer") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 10") + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with string and numeric types") { + val data = spark.sql("""SELECT l_returnflag, l_linestatus, l_quantity, l_extendedprice + |FROM lineitem LIMIT 20""".stripMargin) + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with aggregation downstream") { + val query = + """SELECT l_returnflag, sum(l_quantity) AS sum_qty + |FROM lineitem + |WHERE l_shipdate <= date'1998-09-02' + |GROUP BY l_returnflag""".stripMargin + val data = spark.sql(query) + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + } + + test("RDDScan with empty RDD") { + val data = spark.sql("SELECT l_orderkey FROM lineitem WHERE 1 = 0") + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + assert(df.count() == 0) + } + + test("RDDScan preserves data correctness with multiple re-reads") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 50") + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + // Read twice to verify idempotency + checkAnswer(df, expectedAnswer) + checkAnswer(df, expectedAnswer) + } + + test("RDDScan with null values") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(1, "a", null), + Row(null, "b", 2.0), + Row(3, null, 3.0) + )) + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = true), + StructField("name", StringType, nullable = true), + StructField("value", DoubleType, nullable = true) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + } + + test("RDDScan with all supported primitive types") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row( + true, + 1.toByte, + 2.toShort, + 3, + 4L, + 5.0f, + 6.0, + "hello", + java.sql.Date.valueOf("2024-01-01"), + java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), + Array[Byte](1, 2, 3), + BigDecimal("123.45").underlying() + ) + )) + val schema = StructType( + Seq( + StructField("bool", BooleanType), + StructField("byte", ByteType), + StructField("short", ShortType), + StructField("int", IntegerType), + StructField("long", LongType), + StructField("float", FloatType), + StructField("double", DoubleType), + StructField("string", StringType), + StructField("date", DateType), + StructField("timestamp", TimestampType), + StructField("binary", BinaryType), + StructField("decimal", DecimalType(10, 2)) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + val df = ClassicDataset.ofRows(spark, node).toDF() + + checkAnswer(df, expectedAnswer) + } +} Review Comment: **Missing test coverage for complex types and unsupported-type fallback** **Problem:** The 7 tests cover primitives, nulls, empty RDD, and aggregation — but two important scenarios are untested: 1. **Complex types** (ArrayType, MapType, StructType) — validation explicitly accepts them, but no test exercises the full row-to-columnar JNI path with nested data. 2. **Unsupported type fallback** — no test verifies that a truly unsupported type (e.g., `CalendarIntervalType`) triggers graceful fallback to vanilla Spark instead of a runtime crash. **Suggested Fix:** Add at least these two tests: ```scala test("RDDScan with array type") { val rdd = spark.sparkContext.parallelize(Seq(Row(Seq(1, 2, 3)), Row(Seq(4, 5)))) val schema = StructType(Seq(StructField("arr", ArrayType(IntegerType)))) val data = spark.createDataFrame(rdd, schema) val expectedAnswer = data.collect() val node = LogicalRDD.fromDataset( rdd = data.queryExecution.toRdd, originDataset = data, isStreaming = false) val df = ClassicDataset.ofRows(spark, node).toDF() checkAnswer(df, expectedAnswer) } test("RDDScan falls back for unsupported types") { // Create RDD with CalendarIntervalType or another unsupported type // Verify plan does NOT contain VeloxRDDScanTransformer (i.e., fallback occurred) } ``` ########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + + override protected def doValidateInternal(): ValidationResult = { + // Validate that all output types are supported by Velox's row-to-columnar + // converter. This mirrors the type allowlist in VeloxColumnarToRowExec. + for (field <- schema.fields) { + field.dataType match { + case _: org.apache.spark.sql.types.BooleanType => + case _: org.apache.spark.sql.types.ByteType => + case _: org.apache.spark.sql.types.ShortType => + case _: org.apache.spark.sql.types.IntegerType => + case _: org.apache.spark.sql.types.LongType => + case _: org.apache.spark.sql.types.FloatType => + case _: org.apache.spark.sql.types.DoubleType => + case _: org.apache.spark.sql.types.StringType => + case _: org.apache.spark.sql.types.TimestampType => + case _: org.apache.spark.sql.types.DateType => + case _: org.apache.spark.sql.types.BinaryType => + case _: org.apache.spark.sql.types.DecimalType => + case _: org.apache.spark.sql.types.ArrayType => + case _: org.apache.spark.sql.types.MapType => + case _: org.apache.spark.sql.types.StructType => + case org.apache.spark.sql.types.YearMonthIntervalType.DEFAULT => + case _: org.apache.spark.sql.types.NullType => + case dt + if !VeloxConfig.get.enableTimestampNtzValidation && Review Comment: **`withNewChildrenInternal` returns `this` instead of `copy()`** **Problem:** Returning `this` from a case class's `withNewChildrenInternal` breaks Spark's convention that tree transformations produce structurally new nodes. The CH backend returns `copy(...)` for the equivalent transformer. While this is functionally safe for a leaf node today, it's inconsistent with the project pattern. **Evidence:** ```scala // Velox (this PR): override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = this // CH backend (CHRDDScanTransformer): override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) ``` **Suggested Fix:** ```scala override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
