andygrove commented on code in PR #4234: URL: https://github.com/apache/datafusion-comet/pull/4234#discussion_r3506361536
########## spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream} +import java.nio.channels.Channels +import java.util.concurrent.atomic.AtomicBoolean + +import scala.jdk.CollectionConverters._ + +import org.apache.arrow.vector.{BaseFixedWidthVector, BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, VectorSchemaRoot} +import org.apache.arrow.vector.complex.{LargeListVector, ListVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, SpecialLengths} +import org.apache.spark.sql.comet.util.Utils +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.unsafe.Platform + +import org.apache.comet.CometArrowAllocator +import org.apache.comet.vector.{CometDecodedVector, CometVector} + +/** + * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2). + * + * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's `PythonArrowInput` / + * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types (`VectorSchemaRoot`, + * `Schema`) in their members, and the packaged `comet-spark` jar relocates `org.apache.arrow` to + * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose synthetic Arrow + * members no longer match Spark's unshaded trait contract (an `AbstractMethodError` at runtime). + * + * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs the Arrow IPC + * exchange itself using Comet's (shaded) Arrow. The Python worker only ever sees a standard Arrow + * IPC byte stream, which is version-neutral, so nothing crosses the shaded/unshaded boundary: + * - Input: each Comet `ColumnarBatch` is copied into a shaded struct root and written to the + * worker with a shaded `ArrowStreamWriter`. + * - Output: the worker's Arrow IPC is read with a shaded `ArrowStreamReader` straight into + * `CometVector`s, which is exactly what `CometMapInBatchExec` and downstream native operators + * consume. + * + * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the subclass constructor + * arguments and `writeUDF` differ, so those stay in the per-version subclasses. + */ +private[python] trait CometArrowPythonRunnerBase + extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] { + + /** Worker configuration written to the Python worker before execution. */ + protected def workerConf: Map[String, String] + + /** Comet's Python SQL metrics (data sent/received, rows). */ + protected def pythonMetrics: Map[String, SQLMetric] + + /** Version-specific UDF command serialization. */ + protected def writeUDF(dataOut: DataOutputStream): Unit + + /** + * Input schema as Comet hands it to the runner: a single non-nullable struct named "struct" + * whose children are the user's input columns. Comet's FFI-imported vectors carry Arrow + * `Field`s with null names (Comet uses positional schema), so these names are the source of + * truth for the field names written into the IPC stream that the Python worker reads by name. + */ + protected def schema: StructType + + override val pythonExec: String = + SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec) + + override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled + override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds + override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( + bufferSize >= 4, + "Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") + + override protected def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[Iterator[ColumnarBatch]], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + private val allocator = + CometArrowAllocator.newChildAllocator(s"stdout writer for $pythonExec", 0, Long.MaxValue) + private var currentGroup: Iterator[ColumnarBatch] = _ + private var arrowWriter: ArrowStreamWriter = _ + private var writeRoot: VectorSchemaRoot = _ + private var structVec: StructVector = _ + + context.addTaskCompletionListener[Unit] { _ => + if (writeRoot != null) { + writeRoot.close() + } + allocator.close() + } + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + // handleMetadataBeforeExec: write the worker config as key/value string pairs. + dataOut.writeInt(workerConf.size) + for ((k, v) <- workerConf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + writeUDF(dataOut) + } + + /** Build the destination struct root and start the writer from the given child fields. */ + private def startWriter(childFields: Seq[Field], dataOut: DataOutputStream): Unit = { + val structField = + new Field( + "struct", + new FieldType(false, ArrowType.Struct.INSTANCE, null), + childFields.asJava) + structVec = structField.createVector(allocator).asInstanceOf[StructVector] + writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava) + arrowWriter = new ArrowStreamWriter(writeRoot, null, Channels.newChannel(dataOut)) + arrowWriter.start() + } + + override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { + while (currentGroup == null || !currentGroup.hasNext) { + if (!inputIterator.hasNext) { + if (arrowWriter == null) { + // No input batch was ever produced (e.g. an upstream filter removed every row). + // Still emit a valid, empty Arrow IPC stream so the Python worker's + // ArrowStreamReader reads a schema and then sees zero batches, instead of failing + // on an absent stream ("Invalid IPC stream: negative continuation token"). There is + // no sample batch, so derive the schema from the Spark input schema. The timezone is + // irrelevant here because no rows are exchanged. + val inner = schema.head.dataType.asInstanceOf[StructType] + val childFields = inner.fields.toSeq.map(f => + Utils.toArrowField(f.name, f.dataType, nullable = true, "UTC")) + startWriter(childFields, dataOut) + } + arrowWriter.end() + return false + } + currentGroup = inputIterator.next() + } + + val cometBatch = currentGroup.next() + val startData = dataOut.size() + + if (arrowWriter == null) { + // Build the destination struct root once, sized to the first batch's child fields. + // mapInArrow/mapInPandas exchange the columns under a single non-nullable struct. + // Comet's FFI-imported vectors leave the Arrow Field name null, so restore the real + // column names from the input schema (the worker reads columns by name, and shaded + // Arrow rejects a null field name). The field types and child structure are kept as-is + // so copyVector still walks the source and destination trees in lockstep. + val childNames = schema.head.dataType.asInstanceOf[StructType].fieldNames + val childFields = (0 until cometBatch.numCols()).map { i => + val vecField = + cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.getField + renamed(vecField, childNames(i), forceNullable = true) + } + startWriter(childFields, dataOut) + } + + var i = 0 + while (i < cometBatch.numCols()) { + val src = cometBatch + .column(i) + .asInstanceOf[CometDecodedVector] + .getValueVector + .asInstanceOf[FieldVector] + val dst = structVec.getChildByOrdinal(i).asInstanceOf[FieldVector] + copyVector(src, dst) + i += 1 + } + val numRows = cometBatch.numRows() + structVec.setValueCount(numRows) + // Mark every row of the struct non-null (all-1 validity). The validity buffer is freshly + // allocated and zero-initialised, so without this Python would see an all-null struct. + val validityBytes = (numRows + 7) / 8 + Platform.setMemory( + structVec.getValidityBuffer.memoryAddress(), + 0xff.toByte, + validityBytes) + writeRoot.setRowCount(numRows) + arrowWriter.writeBatch() + + pythonMetrics("pythonDataSent") += dataOut.size() - startData + true + } + } + } + + override protected def newReaderIterator( + stream: DataInputStream, + writer: Writer, + startTime: Long, + env: SparkEnv, + worker: PythonWorker, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + new ReaderIterator(stream, writer, startTime, env, worker, pid, releasedOrClosed, context) { + + private val allocator = + CometArrowAllocator.newChildAllocator(s"stdin reader for $pythonExec", 0, Long.MaxValue) + private var reader: ArrowStreamReader = _ + private var root: VectorSchemaRoot = _ + private var batchLoaded = true + + context.addTaskCompletionListener[Unit] { _ => + if (reader != null) { + reader.close(false) + } + allocator.close() + } + + protected override def read(): ColumnarBatch = { + if (writer.exception.isDefined) { + throw writer.exception.get + } + try { + if (reader != null && batchLoaded) { + batchLoaded = reader.loadNextBatch() + if (batchLoaded) { + // Re-wrap the (reloaded) field vectors fresh each batch, mirroring Comet's + // StreamReader, so each ColumnarBatch reflects the current buffers. + val vectors: Array[ColumnVector] = root.getFieldVectors.asScala.map { vector => + CometVector.getVector(vector, null).asInstanceOf[ColumnVector] + }.toArray + val batch = new ColumnarBatch(vectors) + batch.setNumRows(root.getRowCount) + pythonMetrics("pythonNumRowsReceived") += root.getRowCount + batch + } else { + reader.close(false) + allocator.close() + read() + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + reader = new ArrowStreamReader(stream, allocator) + root = reader.getVectorSchemaRoot() + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } + + /** + * Rebuild `field` with `name`, preserving its Arrow type and child structure. Any nested child + * whose name Comet's FFI import left null is given a positional placeholder so shaded Arrow can + * materialize the struct. Keeping the type and structure intact means the destination tree + * still mirrors the Comet source tree for [[copyVector]]. + */ + private def renamed(field: Field, name: String, forceNullable: Boolean): Field = { + // A Map's descendants must keep their original nullability: Arrow requires the entries struct + // (and its key) to be non-nullable, and `MapVector.createVector` rejects a nullable entries + // struct. Stop forcing nullable once we enter a Map subtree. + val childrenForceNullable = forceNullable && !field.getType.isInstanceOf[ArrowType.Map] + val children = field.getChildren + val newChildren = + if (children.isEmpty) children + else + children.asScala.zipWithIndex.map { case (child, idx) => + renamed( + child, + if (child.getName == null) s"_$idx" else child.getName, + childrenForceNullable) + }.asJava + // Force the field nullable where allowed. Comet's FFI-imported vectors may carry a + // non-nullable Arrow `Field` even for columns that contain nulls (Comet uses positional schema + // and does not round-trip Spark's nullability), and the worker rejects a null value under a + // non-nullable field (`from_pandas(pdf, schema=batch.schema)` raises). Marking the field + // nullable is a safe superset; `copyVector` fills an all-valid validity buffer when the source + // has no nulls. + val ft = field.getFieldType + val nullable = forceNullable || ft.isNullable + val newFt = new FieldType(nullable, ft.getType, ft.getDictionary, ft.getMetadata) + new Field(name, newFt, newChildren) + } + + /** + * Copy a Comet column into the destination FieldVector. Walks both trees in lockstep: sizes + * each destination node from the source, copies every buffer with `ArrowBuf.setBytes`, then + * sets value counts bottom-up so `setValueCount` does not rewrite the offset bytes we just + * copied. Both source and destination are Comet's (shaded) Arrow vectors, so no shaded / + * unshaded type crosses. + */ + private def copyVector(src: FieldVector, dst: FieldVector): Unit = { Review Comment: Deferred the buffer reuse to #4383, which removes the `copyVector` bulk-copy path entirely (the per-batch `allocateNew` goes away with it), so a `reset()` + grow-on-demand rework here would be throwaway against code that is being deleted. I did correct the `test_map_in_arrow_multi_batch_per_partition` docstring in 5802e7ca6: it no longer claims leaf-buffer persistence (only the struct container is reused today) and now points at #4383. ########## spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala: ########## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream} +import java.nio.channels.Channels +import java.util.concurrent.atomic.AtomicBoolean + +import scala.jdk.CollectionConverters._ + +import org.apache.arrow.vector.{BaseFixedWidthVector, BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, VectorSchemaRoot} +import org.apache.arrow.vector.complex.{LargeListVector, ListVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, SpecialLengths} +import org.apache.spark.sql.comet.util.Utils +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.unsafe.Platform + +import org.apache.comet.CometArrowAllocator +import org.apache.comet.vector.{CometDecodedVector, CometVector} + +/** + * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2). + * + * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's `PythonArrowInput` / + * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types (`VectorSchemaRoot`, + * `Schema`) in their members, and the packaged `comet-spark` jar relocates `org.apache.arrow` to + * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose synthetic Arrow + * members no longer match Spark's unshaded trait contract (an `AbstractMethodError` at runtime). + * + * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs the Arrow IPC + * exchange itself using Comet's (shaded) Arrow. The Python worker only ever sees a standard Arrow + * IPC byte stream, which is version-neutral, so nothing crosses the shaded/unshaded boundary: + * - Input: each Comet `ColumnarBatch` is copied into a shaded struct root and written to the + * worker with a shaded `ArrowStreamWriter`. + * - Output: the worker's Arrow IPC is read with a shaded `ArrowStreamReader` straight into + * `CometVector`s, which is exactly what `CometMapInBatchExec` and downstream native operators + * consume. + * + * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the subclass constructor + * arguments and `writeUDF` differ, so those stay in the per-version subclasses. + */ +private[python] trait CometArrowPythonRunnerBase + extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] { + + /** Worker configuration written to the Python worker before execution. */ + protected def workerConf: Map[String, String] + + /** Comet's Python SQL metrics (data sent/received, rows). */ + protected def pythonMetrics: Map[String, SQLMetric] + + /** Version-specific UDF command serialization. */ + protected def writeUDF(dataOut: DataOutputStream): Unit + + /** + * Input schema as Comet hands it to the runner: a single non-nullable struct named "struct" + * whose children are the user's input columns. Comet's FFI-imported vectors carry Arrow + * `Field`s with null names (Comet uses positional schema), so these names are the source of + * truth for the field names written into the IPC stream that the Python worker reads by name. + */ + protected def schema: StructType + + override val pythonExec: String = + SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec) + + override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled + override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds + override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( + bufferSize >= 4, + "Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") + + override protected def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[Iterator[ColumnarBatch]], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + private val allocator = + CometArrowAllocator.newChildAllocator(s"stdout writer for $pythonExec", 0, Long.MaxValue) + private var currentGroup: Iterator[ColumnarBatch] = _ + private var arrowWriter: ArrowStreamWriter = _ + private var writeRoot: VectorSchemaRoot = _ + private var structVec: StructVector = _ + + context.addTaskCompletionListener[Unit] { _ => + if (writeRoot != null) { + writeRoot.close() + } + allocator.close() + } + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + // handleMetadataBeforeExec: write the worker config as key/value string pairs. + dataOut.writeInt(workerConf.size) + for ((k, v) <- workerConf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + writeUDF(dataOut) + } + + /** Build the destination struct root and start the writer from the given child fields. */ + private def startWriter(childFields: Seq[Field], dataOut: DataOutputStream): Unit = { + val structField = + new Field( + "struct", + new FieldType(false, ArrowType.Struct.INSTANCE, null), + childFields.asJava) + structVec = structField.createVector(allocator).asInstanceOf[StructVector] + writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava) + arrowWriter = new ArrowStreamWriter(writeRoot, null, Channels.newChannel(dataOut)) Review Comment: Deferred to #4383. One correctness note on the "fallback honors it" framing: on Spark 4.0 the fallback does *not* compress input to Python either. `PythonArrowInput` there builds `new ArrowStreamWriter(root, null, dataOut)` and writes via `arrowWriter.writeBatch()`, with no `VectorUnloader`/codec. The codec-on-input path only exists on 4.1+, where it is applied through `VectorUnloader.getRecordBatch()` + `MessageSerializer`. #4383 moves this operator onto that same unloader assembly, which is exactly where the codec drops in and where per-version behavior can be matched cleanly. Applying it in the current `copyVector`/`ArrowStreamWriter` writer would be throwaway against that change. ########## spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala: ########## @@ -130,6 +154,48 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa } } + /** + * If the given plan is a Comet ColumnarToRow transition, returns the columnar child the Python + * UDF operator can consume directly. By the time this rule runs the earlier + * `hasCometNativeChild` arm has already rewritten any `ColumnarToRowExec` over a Comet columnar + * source to one of the Comet variants, so vanilla `ColumnarToRowExec` cannot reach here on a + * Comet-driven plan and is intentionally not handled. + */ + private def extractColumnarChild(plan: SparkPlan): Option[SparkPlan] = plan match { + case CometColumnarToRowExec(child) => Some(child) + case CometNativeColumnarToRowExec(child) => Some(child) + // Chained `mapInArrow(udf1).mapInArrow(udf2)`: by the time the outer operator is visited + // (transformUp is bottom-up) the inner one has already become a `CometMapInBatchExec`, which + // is itself columnar. There is no row transition between them to strip, so consume its + // columnar output directly. Its flattened output vectors are `CometVector`s, exactly what + // `CometMapInBatchExec`'s input path expects. + case child: CometMapInBatchExec => Some(child) + case _ => None + } + + /** + * Matches the plans this rule should rewrite to `CometMapInBatchExec`. Single extractor used in + * the `transformUp` arm above so the matchers and conf reads run once per visited plan. Returns + * `(info, columnarChild)` where `columnarChild` is the Comet columnar producer that + * `CometMapInBatchExec` will consume directly. Returns `None` (and the arm misses) when the + * conf is off, when `useLargeVarTypes` forces the fallback, when the plan is not one of the + * version-shimmed MapInArrow / MapInPandas operators, or when the child is not a Comet + * columnar-to-row transition we can strip. + */ + private object EligibleMapInBatch { + def unapply(plan: SparkPlan): Option[(MapInBatchInfo, SparkPlan)] = { + if (!CometConf.COMET_PYARROW_UDF_ENABLED.get()) { + None + } else if (arrowUseLargeVarTypes(plan.conf)) { + None Review Comment: Deferred to #4383. Agreed the `useLargeVarTypes` gate becomes dead code once the copy understands 8-byte offsets, but the offset widening belongs in the reworked copy path (#4383 assembles the `ArrowRecordBatch` directly and is where the offset handling gets re-answered) rather than in the `copyVector` version that is being removed. The current gate is a safe fallback to vanilla Spark, not a correctness gap, so I left it in place for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
