mbutrovich commented on code in PR #4234:
URL: https://github.com/apache/datafusion-comet/pull/4234#discussion_r3501879306


##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))
+        arrowWriter.start()
+      }
+
+      override def writeNextInputToStream(dataOut: DataOutputStream): Boolean 
= {
+        while (currentGroup == null || !currentGroup.hasNext) {
+          if (!inputIterator.hasNext) {
+            if (arrowWriter == null) {
+              // No input batch was ever produced (e.g. an upstream filter 
removed every row).
+              // Still emit a valid, empty Arrow IPC stream so the Python 
worker's
+              // ArrowStreamReader reads a schema and then sees zero batches, 
instead of failing
+              // on an absent stream ("Invalid IPC stream: negative 
continuation token"). There is
+              // no sample batch, so derive the schema from the Spark input 
schema. The timezone is
+              // irrelevant here because no rows are exchanged.
+              val inner = schema.head.dataType.asInstanceOf[StructType]
+              val childFields = inner.fields.toSeq.map(f =>
+                Utils.toArrowField(f.name, f.dataType, nullable = true, "UTC"))
+              startWriter(childFields, dataOut)
+            }
+            arrowWriter.end()
+            return false
+          }
+          currentGroup = inputIterator.next()
+        }
+
+        val cometBatch = currentGroup.next()
+        val startData = dataOut.size()
+
+        if (arrowWriter == null) {
+          // Build the destination struct root once, sized to the first 
batch's child fields.
+          // mapInArrow/mapInPandas exchange the columns under a single 
non-nullable struct.
+          // Comet's FFI-imported vectors leave the Arrow Field name null, so 
restore the real
+          // column names from the input schema (the worker reads columns by 
name, and shaded
+          // Arrow rejects a null field name). The field types and child 
structure are kept as-is
+          // so copyVector still walks the source and destination trees in 
lockstep.
+          val childNames = 
schema.head.dataType.asInstanceOf[StructType].fieldNames
+          val childFields = (0 until cometBatch.numCols()).map { i =>
+            val vecField =
+              
cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.getField
+            renamed(vecField, childNames(i), forceNullable = true)
+          }
+          startWriter(childFields, dataOut)
+        }
+
+        var i = 0
+        while (i < cometBatch.numCols()) {
+          val src = cometBatch
+            .column(i)
+            .asInstanceOf[CometDecodedVector]
+            .getValueVector
+            .asInstanceOf[FieldVector]
+          val dst = structVec.getChildByOrdinal(i).asInstanceOf[FieldVector]
+          copyVector(src, dst)
+          i += 1
+        }
+        val numRows = cometBatch.numRows()
+        structVec.setValueCount(numRows)
+        // Mark every row of the struct non-null (all-1 validity). The 
validity buffer is freshly
+        // allocated and zero-initialised, so without this Python would see an 
all-null struct.
+        val validityBytes = (numRows + 7) / 8
+        Platform.setMemory(
+          structVec.getValidityBuffer.memoryAddress(),
+          0xff.toByte,
+          validityBytes)
+        writeRoot.setRowCount(numRows)
+        arrowWriter.writeBatch()
+
+        pythonMetrics("pythonDataSent") += dataOut.size() - startData
+        true
+      }
+    }
+  }
+
+  override protected def newReaderIterator(
+      stream: DataInputStream,
+      writer: Writer,
+      startTime: Long,
+      env: SparkEnv,
+      worker: PythonWorker,
+      pid: Option[Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+    new ReaderIterator(stream, writer, startTime, env, worker, pid, 
releasedOrClosed, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdin reader for $pythonExec", 
0, Long.MaxValue)
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var batchLoaded = true
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (reader != null) {
+          reader.close(false)
+        }
+        allocator.close()
+      }
+
+      protected override def read(): ColumnarBatch = {
+        if (writer.exception.isDefined) {
+          throw writer.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              // Re-wrap the (reloaded) field vectors fresh each batch, 
mirroring Comet's
+              // StreamReader, so each ColumnarBatch reflects the current 
buffers.
+              val vectors: Array[ColumnVector] = 
root.getFieldVectors.asScala.map { vector =>
+                CometVector.getVector(vector, null).asInstanceOf[ColumnVector]
+              }.toArray
+              val batch = new ColumnarBatch(vectors)
+              batch.setNumRows(root.getRowCount)
+              pythonMetrics("pythonNumRowsReceived") += root.getRowCount
+              batch
+            } else {
+              reader.close(false)
+              allocator.close()
+              read()
+            }
+          } else {
+            stream.readInt() match {
+              case SpecialLengths.START_ARROW_STREAM =>
+                reader = new ArrowStreamReader(stream, allocator)
+                root = reader.getVectorSchemaRoot()
+                read()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
+              case SpecialLengths.END_OF_DATA_SECTION =>
+                handleEndOfDataSection()
+                null
+            }
+          }
+        } catch handleException
+      }
+    }
+  }
+
+  /**
+   * Rebuild `field` with `name`, preserving its Arrow type and child 
structure. Any nested child
+   * whose name Comet's FFI import left null is given a positional placeholder 
so shaded Arrow can
+   * materialize the struct. Keeping the type and structure intact means the 
destination tree
+   * still mirrors the Comet source tree for [[copyVector]].
+   */
+  private def renamed(field: Field, name: String, forceNullable: Boolean): 
Field = {
+    // A Map's descendants must keep their original nullability: Arrow 
requires the entries struct
+    // (and its key) to be non-nullable, and `MapVector.createVector` rejects 
a nullable entries
+    // struct. Stop forcing nullable once we enter a Map subtree.
+    val childrenForceNullable = forceNullable && 
!field.getType.isInstanceOf[ArrowType.Map]
+    val children = field.getChildren
+    val newChildren =
+      if (children.isEmpty) children
+      else
+        children.asScala.zipWithIndex.map { case (child, idx) =>
+          renamed(
+            child,
+            if (child.getName == null) s"_$idx" else child.getName,
+            childrenForceNullable)
+        }.asJava
+    // Force the field nullable where allowed. Comet's FFI-imported vectors 
may carry a
+    // non-nullable Arrow `Field` even for columns that contain nulls (Comet 
uses positional schema
+    // and does not round-trip Spark's nullability), and the worker rejects a 
null value under a
+    // non-nullable field (`from_pandas(pdf, schema=batch.schema)` raises). 
Marking the field
+    // nullable is a safe superset; `copyVector` fills an all-valid validity 
buffer when the source
+    // has no nulls.
+    val ft = field.getFieldType
+    val nullable = forceNullable || ft.isNullable
+    val newFt = new FieldType(nullable, ft.getType, ft.getDictionary, 
ft.getMetadata)
+    new Field(name, newFt, newChildren)
+  }
+
+  /**
+   * Copy a Comet column into the destination FieldVector. Walks both trees in 
lockstep: sizes
+   * each destination node from the source, copies every buffer with 
`ArrowBuf.setBytes`, then
+   * sets value counts bottom-up so `setValueCount` does not rewrite the 
offset bytes we just
+   * copied. Both source and destination are Comet's (shaded) Arrow vectors, 
so no shaded /
+   * unshaded type crosses.
+   */
+  private def copyVector(src: FieldVector, dst: FieldVector): Unit = {

Review Comment:
   `copyVector` calls `dst.allocateNew(...)` on every child for every batch. In 
Arrow Java `BaseFixedWidthVector.allocateNew(int)` calls `clear()` first, which 
releases both buffers before reallocating (`BaseFixedWidthVector.java:292` -> 
`:296` -> `releaseBuffer` at `:240-241`), and `BaseVariableWidthVector` does 
the same. So the only thing actually persistent across batches is the 
`StructVector` container. Every leaf buffer is freed and re-malloc'd per batch. 
On a 50-column batch that is ~100 free/malloc pairs per batch.
   
   Can we make the root genuinely persistent? Allocate once on the first batch, 
then for subsequent batches use `reset()` (`BaseFixedWidthVector.java:225`, 
"doesn't release any memory") plus a capacity check, reallocating only when the 
incoming batch does not fit. That is the `setValueCount(0)` + grow-on-demand 
path, and it removes the per-batch churn that the design is otherwise paying 
for. While you are in here, the `test_map_in_arrow_multi_batch_per_partition` 
docstring (`test_pyarrow_udf.py`, approx lines 437-442) says it exercises "the 
persistent destination IPC root over multiple batches," which is not true today 
since the child buffers reallocate. Once the reuse lands the docstring becomes 
accurate. If for some reason the reuse cannot work, the docstring needs 
correcting either way.



##########
spark/src/test/resources/pyspark/test_pyarrow_udf.py:
##########
@@ -0,0 +1,1177 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Pytest-driven integration tests for Comet's PyArrow UDF acceleration.
+
+Each test runs against two execution paths:
+  - "accelerated": spark.comet.exec.pyarrowUdf.enabled=true
+                   (plan should contain CometMapInBatch and no ColumnarToRow)
+  - "fallback":    spark.comet.exec.pyarrowUdf.enabled=false
+                   (plan should contain vanilla PythonMapInArrow / MapInArrow)
+
+Usage:
+    # Build Comet first:
+    make
+
+    # Then either let the test discover the jar from spark/target, or pass it
+    # explicitly via COMET_JAR:
+    export 
COMET_JAR=$PWD/spark/target/comet-spark-spark3.5_2.12-0.16.0-SNAPSHOT.jar
+
+    pip install pyspark==3.5.8 pyarrow pandas pytest
+    pytest -v spark/src/test/resources/pyspark/test_pyarrow_udf.py
+"""
+
+import datetime as dt
+import os
+from decimal import Decimal
+
+import pyarrow as pa
+import pytest
+from pyspark.sql import SparkSession, types as T
+
+from conftest import resolve_comet_jar
+
+
[email protected](scope="session")
+def spark():
+    jar = resolve_comet_jar()
+    # PYSPARK_SUBMIT_ARGS is consumed when pyspark launches its JVM. Setting
+    # --jars puts the Comet jar on both driver and executor classpaths so the
+    # CometPlugin can be loaded.
+    os.environ["PYSPARK_SUBMIT_ARGS"] = (
+        f"--jars {jar} --driver-class-path {jar} pyspark-shell"
+    )
+    session = (
+        SparkSession.builder.master("local[2]")
+        .appName("comet-pyarrow-udf-tests")
+        .config("spark.plugins", "org.apache.spark.CometPlugin")
+        .config("spark.comet.enabled", "true")
+        .config("spark.comet.exec.enabled", "true")
+        # spark.comet.exec.shuffle.enabled defaults to true, and
+        # CometSparkSessionExtensions.isCometLoaded refuses to register 
Comet's rules
+        # at all when shuffle is on but spark.shuffle.manager is not the Comet 
manager.
+        # These tests do not need Comet shuffle, so disable it explicitly to 
keep
+        # Comet's scan and exec rules active without configuring shuffle.
+        .config("spark.comet.exec.shuffle.enabled", "false")
+        .config("spark.memory.offHeap.enabled", "true")
+        .config("spark.memory.offHeap.size", "2g")
+        .getOrCreate()
+    )
+    try:
+        yield session
+    finally:
+        session.stop()
+
+
[email protected](params=[True, False], ids=["accelerated", "fallback"])
+def accelerated(request, spark) -> bool:
+    spark.conf.set(
+        "spark.comet.exec.pyarrowUdf.enabled",
+        "true" if request.param else "false",
+    )
+    return request.param
+
+
+def _executed_plan(df) -> str:
+    return df._jdf.queryExecution().executedPlan().toString()
+
+
+def _assert_plan_matches_mode(
+    plan: str, accelerated: bool, vanilla_node: str = "MapInArrow"
+) -> None:
+    if accelerated:
+        assert "CometMapInBatch" in plan, (
+            f"expected CometMapInBatch in accelerated plan, got:\n{plan}"
+        )
+        assert "ColumnarToRow" not in plan, (
+            f"unexpected ColumnarToRow in accelerated plan:\n{plan}"
+        )
+    else:
+        assert "CometMapInBatch" not in plan, (
+            f"unexpected CometMapInBatch in fallback plan:\n{plan}"
+        )
+        assert vanilla_node in plan, (
+            f"expected {vanilla_node} in fallback plan, got:\n{plan}"
+        )
+
+
+def test_map_in_arrow_doubles_value(spark, tmp_path, accelerated):
+    data = [(i, float(i * 1.5), f"name_{i}") for i in range(100)]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(data, ["id", "value", "name"]).write.parquet(src)
+
+    def double_value(iterator):
+        for batch in iterator:
+            pdf = batch.to_pandas()
+            pdf["value"] = pdf["value"] * 2
+            yield pa.RecordBatch.from_pandas(pdf)
+
+    schema = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("value", T.DoubleType()),
+            T.StructField("name", T.StringType()),
+        ]
+    )
+    result_df = spark.read.parquet(src).mapInArrow(double_value, schema)
+
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    rows = result_df.orderBy("id").collect()
+    assert len(rows) == len(data)
+    for row, original in zip(rows, data):
+        assert row["id"] == original[0]
+        assert abs(row["value"] - original[1] * 2) < 1e-6
+        assert row["name"] == original[2]
+
+
+# All other tests use the default `vanilla_node="MapInArrow"`. The mapInPandas 
tests below
+# pass `MapInPandas` explicitly. The substring is the same on Spark 3.5 
(PythonMapInArrowExec)
+# and Spark 4.x (MapInArrowExec) since the latter is a substring of the former.
+
+
+def test_map_in_arrow_changes_schema(spark, tmp_path, accelerated):
+    data = [(i, float(i)) for i in range(50)]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(data, ["id", "value"]).write.parquet(src)
+
+    def add_computed_column(iterator):
+        for batch in iterator:
+            pdf = batch.to_pandas()
+            pdf["squared"] = pdf["value"] ** 2
+            pdf["label"] = pdf["id"].apply(lambda x: f"item_{x}")
+            yield pa.RecordBatch.from_pandas(pdf)
+
+    schema = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("value", T.DoubleType()),
+            T.StructField("squared", T.DoubleType()),
+            T.StructField("label", T.StringType()),
+        ]
+    )
+    result_df = spark.read.parquet(src).mapInArrow(add_computed_column, schema)
+
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    rows = result_df.orderBy("id").collect()
+    assert len(rows) == 50
+    for i, row in enumerate(rows):
+        assert abs(row["squared"] - float(i) ** 2) < 1e-6
+        assert row["label"] == f"item_{i}"
+
+
+def test_map_in_pandas_doubles_value(spark, tmp_path, accelerated):
+    data = [(i, float(i * 1.5)) for i in range(100)]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(data, ["id", "value"]).write.parquet(src)
+
+    def double_value(iterator):
+        for pdf in iterator:
+            pdf = pdf.copy()
+            pdf["value"] = pdf["value"] * 2
+            yield pdf
+
+    schema = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("value", T.DoubleType()),
+        ]
+    )
+    result_df = spark.read.parquet(src).mapInPandas(double_value, schema)
+
+    _assert_plan_matches_mode(
+        _executed_plan(result_df), accelerated, vanilla_node="MapInPandas"
+    )
+
+    rows = result_df.orderBy("id").collect()
+    assert len(rows) == len(data)
+    for row, original in zip(rows, data):
+        assert row["id"] == original[0]
+        assert abs(row["value"] - original[1] * 2) < 1e-6
+
+
+def test_map_in_pandas_changes_schema(spark, tmp_path, accelerated):
+    data = [(i, float(i)) for i in range(50)]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(data, ["id", "value"]).write.parquet(src)
+
+    def add_squared(iterator):
+        for pdf in iterator:
+            pdf = pdf.copy()
+            pdf["squared"] = pdf["value"] ** 2
+            yield pdf
+
+    schema = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("value", T.DoubleType()),
+            T.StructField("squared", T.DoubleType()),
+        ]
+    )
+    result_df = spark.read.parquet(src).mapInPandas(add_squared, schema)
+
+    _assert_plan_matches_mode(
+        _executed_plan(result_df), accelerated, vanilla_node="MapInPandas"
+    )
+
+    rows = result_df.orderBy("id").collect()
+    assert len(rows) == 50
+    for i, row in enumerate(rows):
+        assert abs(row["squared"] - float(i) ** 2) < 1e-6
+
+
+def test_map_in_arrow_preserves_nulls(spark, tmp_path, accelerated):
+    schema_in = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("name", T.StringType()),
+        ]
+    )
+    rows = [
+        (1, "a"),
+        (2, None),
+        (None, "c"),
+        (None, None),
+        (5, "e"),
+    ]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(rows, schema_in).write.parquet(src)
+
+    def passthrough(iterator):
+        # Pure Arrow passthrough so nulls survive without a pandas roundtrip
+        # (pandas would coerce null longs to NaN floats).
+        for batch in iterator:
+            yield batch
+
+    result_df = spark.read.parquet(src).mapInArrow(passthrough, schema_in)
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    out = {(r["id"], r["name"]) for r in result_df.collect()}
+    assert out == set(rows)
+
+
+def test_map_in_arrow_empty_input(spark, tmp_path, accelerated):
+    schema_in = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("value", T.DoubleType()),
+        ]
+    )
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame([(1, 1.0), (2, 2.0)], schema_in).write.parquet(src)
+
+    def passthrough(iterator):
+        for batch in iterator:
+            yield batch
+
+    # Filter all rows out so the operator sees an empty stream from CometScan.
+    result_df = (
+        spark.read.parquet(src).where("id < 0").mapInArrow(passthrough, 
schema_in)
+    )
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    assert result_df.count() == 0
+
+
+def test_map_in_arrow_python_exception_propagates(spark, tmp_path, 
accelerated):
+    schema_in = T.StructType([T.StructField("id", T.LongType())])
+    data = [(i,) for i in range(10)]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(data, schema_in).write.parquet(src)
+
+    sentinel = "boom-from-pyarrow-udf"
+
+    def boom(iterator):
+        for _batch in iterator:
+            raise ValueError(sentinel)
+        # Unreachable, but mapInArrow requires the callable to be a generator.
+        yield  # pragma: no cover
+
+    result_df = spark.read.parquet(src).mapInArrow(boom, schema_in)
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    with pytest.raises(Exception) as exc_info:
+        result_df.collect()
+    assert sentinel in str(exc_info.value), (
+        f"expected sentinel {sentinel!r} in exception, got: {exc_info.value}"
+    )
+
+
+def test_map_in_arrow_decimal_type(spark, tmp_path, accelerated):
+    schema_in = T.StructType(
+        [
+            T.StructField("id", T.LongType()),
+            T.StructField("amount", T.DecimalType(18, 6)),
+        ]
+    )
+    rows = [
+        (1, Decimal("123.456789")),
+        (2, Decimal("0.000001")),
+        (3, Decimal("-99999999.999999")),
+        (4, None),
+    ]
+    src = str(tmp_path / "src.parquet")
+    spark.createDataFrame(rows, schema_in).write.parquet(src)
+
+    def passthrough(iterator):
+        for batch in iterator:
+            yield batch
+
+    result_df = spark.read.parquet(src).mapInArrow(passthrough, schema_in)
+    _assert_plan_matches_mode(_executed_plan(result_df), accelerated)
+
+    out = {(r["id"], r["amount"]) for r in result_df.collect()}
+    assert out == set(rows)
+
+
[email protected](
+    "precision,scale",
+    [
+        (1, 0),
+        (9, 0),
+        (9, 4),
+        (17, 8),
+        (18, 0),
+        (18, 18),
+        (19, 0),
+        (28, 14),
+        (38, 0),
+        (38, 18),
+        (38, 38),
+    ],
+)
+def test_map_in_arrow_decimal_precision_sweep(

Review Comment:
   The docstring says `BaseFixedWidthVector` handles precision <= 18 as 
"long-backed (8 bytes)" and precision >= 19 as "16-byte FixedSizeBinary." The 
Arrow `DecimalVector` that `copyVector` touches is always 16 bytes wide 
regardless of precision. The 8-byte long-backed form is Spark's `UnsafeRow` 
encoding, a layer this Arrow copy never sees. The sweep is good coverage. 
Please fix the rationale so it does not point at a buffer-width boundary that 
does not exist on the Arrow path.



##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))
+        arrowWriter.start()
+      }
+
+      override def writeNextInputToStream(dataOut: DataOutputStream): Boolean 
= {
+        while (currentGroup == null || !currentGroup.hasNext) {
+          if (!inputIterator.hasNext) {
+            if (arrowWriter == null) {
+              // No input batch was ever produced (e.g. an upstream filter 
removed every row).
+              // Still emit a valid, empty Arrow IPC stream so the Python 
worker's
+              // ArrowStreamReader reads a schema and then sees zero batches, 
instead of failing
+              // on an absent stream ("Invalid IPC stream: negative 
continuation token"). There is
+              // no sample batch, so derive the schema from the Spark input 
schema. The timezone is
+              // irrelevant here because no rows are exchanged.
+              val inner = schema.head.dataType.asInstanceOf[StructType]
+              val childFields = inner.fields.toSeq.map(f =>
+                Utils.toArrowField(f.name, f.dataType, nullable = true, "UTC"))
+              startWriter(childFields, dataOut)
+            }
+            arrowWriter.end()
+            return false
+          }
+          currentGroup = inputIterator.next()
+        }
+
+        val cometBatch = currentGroup.next()
+        val startData = dataOut.size()
+
+        if (arrowWriter == null) {
+          // Build the destination struct root once, sized to the first 
batch's child fields.
+          // mapInArrow/mapInPandas exchange the columns under a single 
non-nullable struct.
+          // Comet's FFI-imported vectors leave the Arrow Field name null, so 
restore the real
+          // column names from the input schema (the worker reads columns by 
name, and shaded
+          // Arrow rejects a null field name). The field types and child 
structure are kept as-is
+          // so copyVector still walks the source and destination trees in 
lockstep.
+          val childNames = 
schema.head.dataType.asInstanceOf[StructType].fieldNames
+          val childFields = (0 until cometBatch.numCols()).map { i =>
+            val vecField =
+              
cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.getField
+            renamed(vecField, childNames(i), forceNullable = true)
+          }
+          startWriter(childFields, dataOut)
+        }
+
+        var i = 0
+        while (i < cometBatch.numCols()) {
+          val src = cometBatch
+            .column(i)
+            .asInstanceOf[CometDecodedVector]
+            .getValueVector
+            .asInstanceOf[FieldVector]
+          val dst = structVec.getChildByOrdinal(i).asInstanceOf[FieldVector]
+          copyVector(src, dst)
+          i += 1
+        }
+        val numRows = cometBatch.numRows()
+        structVec.setValueCount(numRows)
+        // Mark every row of the struct non-null (all-1 validity). The 
validity buffer is freshly
+        // allocated and zero-initialised, so without this Python would see an 
all-null struct.
+        val validityBytes = (numRows + 7) / 8
+        Platform.setMemory(
+          structVec.getValidityBuffer.memoryAddress(),
+          0xff.toByte,
+          validityBytes)
+        writeRoot.setRowCount(numRows)
+        arrowWriter.writeBatch()
+
+        pythonMetrics("pythonDataSent") += dataOut.size() - startData
+        true
+      }
+    }
+  }
+
+  override protected def newReaderIterator(
+      stream: DataInputStream,
+      writer: Writer,
+      startTime: Long,
+      env: SparkEnv,
+      worker: PythonWorker,
+      pid: Option[Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+    new ReaderIterator(stream, writer, startTime, env, worker, pid, 
releasedOrClosed, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdin reader for $pythonExec", 
0, Long.MaxValue)
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var batchLoaded = true
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (reader != null) {
+          reader.close(false)
+        }
+        allocator.close()
+      }
+
+      protected override def read(): ColumnarBatch = {
+        if (writer.exception.isDefined) {
+          throw writer.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              // Re-wrap the (reloaded) field vectors fresh each batch, 
mirroring Comet's
+              // StreamReader, so each ColumnarBatch reflects the current 
buffers.
+              val vectors: Array[ColumnVector] = 
root.getFieldVectors.asScala.map { vector =>
+                CometVector.getVector(vector, null).asInstanceOf[ColumnVector]
+              }.toArray
+              val batch = new ColumnarBatch(vectors)
+              batch.setNumRows(root.getRowCount)
+              pythonMetrics("pythonNumRowsReceived") += root.getRowCount
+              batch
+            } else {
+              reader.close(false)
+              allocator.close()
+              read()
+            }
+          } else {
+            stream.readInt() match {
+              case SpecialLengths.START_ARROW_STREAM =>
+                reader = new ArrowStreamReader(stream, allocator)
+                root = reader.getVectorSchemaRoot()
+                read()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
+              case SpecialLengths.END_OF_DATA_SECTION =>
+                handleEndOfDataSection()
+                null
+            }
+          }
+        } catch handleException
+      }
+    }
+  }
+
+  /**
+   * Rebuild `field` with `name`, preserving its Arrow type and child 
structure. Any nested child
+   * whose name Comet's FFI import left null is given a positional placeholder 
so shaded Arrow can
+   * materialize the struct. Keeping the type and structure intact means the 
destination tree
+   * still mirrors the Comet source tree for [[copyVector]].
+   */
+  private def renamed(field: Field, name: String, forceNullable: Boolean): 
Field = {
+    // A Map's descendants must keep their original nullability: Arrow 
requires the entries struct
+    // (and its key) to be non-nullable, and `MapVector.createVector` rejects 
a nullable entries
+    // struct. Stop forcing nullable once we enter a Map subtree.
+    val childrenForceNullable = forceNullable && 
!field.getType.isInstanceOf[ArrowType.Map]
+    val children = field.getChildren
+    val newChildren =
+      if (children.isEmpty) children
+      else
+        children.asScala.zipWithIndex.map { case (child, idx) =>
+          renamed(
+            child,
+            if (child.getName == null) s"_$idx" else child.getName,
+            childrenForceNullable)
+        }.asJava
+    // Force the field nullable where allowed. Comet's FFI-imported vectors 
may carry a
+    // non-nullable Arrow `Field` even for columns that contain nulls (Comet 
uses positional schema
+    // and does not round-trip Spark's nullability), and the worker rejects a 
null value under a
+    // non-nullable field (`from_pandas(pdf, schema=batch.schema)` raises). 
Marking the field
+    // nullable is a safe superset; `copyVector` fills an all-valid validity 
buffer when the source
+    // has no nulls.
+    val ft = field.getFieldType
+    val nullable = forceNullable || ft.isNullable
+    val newFt = new FieldType(nullable, ft.getType, ft.getDictionary, 
ft.getMetadata)
+    new Field(name, newFt, newChildren)
+  }
+
+  /**
+   * Copy a Comet column into the destination FieldVector. Walks both trees in 
lockstep: sizes
+   * each destination node from the source, copies every buffer with 
`ArrowBuf.setBytes`, then
+   * sets value counts bottom-up so `setValueCount` does not rewrite the 
offset bytes we just
+   * copied. Both source and destination are Comet's (shaded) Arrow vectors, 
so no shaded /
+   * unshaded type crosses.
+   */
+  private def copyVector(src: FieldVector, dst: FieldVector): Unit = {
+    val valueCount = src.getValueCount
+
+    dst match {
+      case bfwv: BaseFixedWidthVector =>
+        bfwv.allocateNew(valueCount)
+      case bvwv: BaseVariableWidthVector =>
+        bvwv.allocateNew(src.getDataBuffer.readableBytes, valueCount)
+      case blvwv: BaseLargeVariableWidthVector =>
+        blvwv.allocateNew(src.getDataBuffer.readableBytes, valueCount)
+      case _ =>
+        dst.setInitialCapacity(valueCount)
+        dst.allocateNew()
+    }
+
+    val srcBufs = src.getFieldBuffers
+    val dstBufs = dst.getFieldBuffers
+    require(
+      srcBufs.size == dstBufs.size,
+      s"buffer count mismatch for ${dst.getField}: src=${srcBufs.size}, 
dst=${dstBufs.size}")
+    var b = 0
+    while (b < srcBufs.size) {

Review Comment:
   The manual `while (b < srcBufs.size)` reads more cleanly as 
`srcBufs.asScala.zip(dstBufs.asScala).foreach { case (s, d) => d.setBytes(0, s, 
0, s.readableBytes) }`, with the buffer-count `require` moved to a size check 
beforehand. Drops a nesting level.



##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))

Review Comment:
   The writer constructs `ArrowStreamWriter(writeRoot, null, channel)` with no 
compression codec, so `spark.sql.execution.arrow.compression.codec` is ignored 
on this path. The fallback path honors it. The Python `ArrowStreamReader` 
auto-detects compression from the IPC message metadata, so an uncompressed 
stream still reads correctly, but a user who sets the codec gets it silently 
dropped when the rewrite fires. Please resolve the codec from `SQLConf` and 
pass it through the `VectorUnloader` the writer uses, matching the fallback 
behavior.



##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))
+        arrowWriter.start()
+      }
+
+      override def writeNextInputToStream(dataOut: DataOutputStream): Boolean 
= {
+        while (currentGroup == null || !currentGroup.hasNext) {
+          if (!inputIterator.hasNext) {
+            if (arrowWriter == null) {
+              // No input batch was ever produced (e.g. an upstream filter 
removed every row).
+              // Still emit a valid, empty Arrow IPC stream so the Python 
worker's
+              // ArrowStreamReader reads a schema and then sees zero batches, 
instead of failing
+              // on an absent stream ("Invalid IPC stream: negative 
continuation token"). There is
+              // no sample batch, so derive the schema from the Spark input 
schema. The timezone is
+              // irrelevant here because no rows are exchanged.
+              val inner = schema.head.dataType.asInstanceOf[StructType]
+              val childFields = inner.fields.toSeq.map(f =>
+                Utils.toArrowField(f.name, f.dataType, nullable = true, "UTC"))
+              startWriter(childFields, dataOut)
+            }
+            arrowWriter.end()
+            return false
+          }
+          currentGroup = inputIterator.next()
+        }
+
+        val cometBatch = currentGroup.next()
+        val startData = dataOut.size()
+
+        if (arrowWriter == null) {
+          // Build the destination struct root once, sized to the first 
batch's child fields.
+          // mapInArrow/mapInPandas exchange the columns under a single 
non-nullable struct.
+          // Comet's FFI-imported vectors leave the Arrow Field name null, so 
restore the real
+          // column names from the input schema (the worker reads columns by 
name, and shaded
+          // Arrow rejects a null field name). The field types and child 
structure are kept as-is
+          // so copyVector still walks the source and destination trees in 
lockstep.
+          val childNames = 
schema.head.dataType.asInstanceOf[StructType].fieldNames
+          val childFields = (0 until cometBatch.numCols()).map { i =>
+            val vecField =
+              
cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.getField
+            renamed(vecField, childNames(i), forceNullable = true)
+          }
+          startWriter(childFields, dataOut)
+        }
+
+        var i = 0
+        while (i < cometBatch.numCols()) {
+          val src = cometBatch
+            .column(i)
+            .asInstanceOf[CometDecodedVector]
+            .getValueVector
+            .asInstanceOf[FieldVector]
+          val dst = structVec.getChildByOrdinal(i).asInstanceOf[FieldVector]
+          copyVector(src, dst)
+          i += 1
+        }
+        val numRows = cometBatch.numRows()
+        structVec.setValueCount(numRows)
+        // Mark every row of the struct non-null (all-1 validity). The 
validity buffer is freshly
+        // allocated and zero-initialised, so without this Python would see an 
all-null struct.
+        val validityBytes = (numRows + 7) / 8
+        Platform.setMemory(
+          structVec.getValidityBuffer.memoryAddress(),
+          0xff.toByte,
+          validityBytes)
+        writeRoot.setRowCount(numRows)
+        arrowWriter.writeBatch()
+
+        pythonMetrics("pythonDataSent") += dataOut.size() - startData
+        true
+      }
+    }
+  }
+
+  override protected def newReaderIterator(
+      stream: DataInputStream,
+      writer: Writer,
+      startTime: Long,
+      env: SparkEnv,
+      worker: PythonWorker,
+      pid: Option[Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+    new ReaderIterator(stream, writer, startTime, env, worker, pid, 
releasedOrClosed, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdin reader for $pythonExec", 
0, Long.MaxValue)
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var batchLoaded = true
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (reader != null) {
+          reader.close(false)
+        }
+        allocator.close()
+      }
+
+      protected override def read(): ColumnarBatch = {
+        if (writer.exception.isDefined) {
+          throw writer.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              // Re-wrap the (reloaded) field vectors fresh each batch, 
mirroring Comet's
+              // StreamReader, so each ColumnarBatch reflects the current 
buffers.
+              val vectors: Array[ColumnVector] = 
root.getFieldVectors.asScala.map { vector =>
+                CometVector.getVector(vector, null).asInstanceOf[ColumnVector]
+              }.toArray
+              val batch = new ColumnarBatch(vectors)
+              batch.setNumRows(root.getRowCount)
+              pythonMetrics("pythonNumRowsReceived") += root.getRowCount

Review Comment:
   The read path increments `pythonNumRowsReceived` but nothing increments 
`pythonDataReceived`, while the writer does increment `pythonDataSent`. Stock 
`BasicPythonArrowOutput` populates `pythonDataReceived` as it reads frames. As 
written the accelerated path reports 0 bytes received, so the metric silently 
diverges from the fallback path depending on whether the rewrite fired. Please 
track the bytes read here so the metric matches the fallback. The metrics map 
in `CometMapInBatchExec` is where it surfaces to users.



##########
spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala:
##########
@@ -130,6 +154,48 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
     }
   }
 
+  /**
+   * If the given plan is a Comet ColumnarToRow transition, returns the 
columnar child the Python
+   * UDF operator can consume directly. By the time this rule runs the earlier
+   * `hasCometNativeChild` arm has already rewritten any `ColumnarToRowExec` 
over a Comet columnar
+   * source to one of the Comet variants, so vanilla `ColumnarToRowExec` 
cannot reach here on a
+   * Comet-driven plan and is intentionally not handled.
+   */
+  private def extractColumnarChild(plan: SparkPlan): Option[SparkPlan] = plan 
match {
+    case CometColumnarToRowExec(child) => Some(child)
+    case CometNativeColumnarToRowExec(child) => Some(child)
+    // Chained `mapInArrow(udf1).mapInArrow(udf2)`: by the time the outer 
operator is visited
+    // (transformUp is bottom-up) the inner one has already become a 
`CometMapInBatchExec`, which
+    // is itself columnar. There is no row transition between them to strip, 
so consume its
+    // columnar output directly. Its flattened output vectors are 
`CometVector`s, exactly what
+    // `CometMapInBatchExec`'s input path expects.
+    case child: CometMapInBatchExec => Some(child)
+    case _ => None
+  }
+
+  /**
+   * Matches the plans this rule should rewrite to `CometMapInBatchExec`. 
Single extractor used in
+   * the `transformUp` arm above so the matchers and conf reads run once per 
visited plan. Returns
+   * `(info, columnarChild)` where `columnarChild` is the Comet columnar 
producer that
+   * `CometMapInBatchExec` will consume directly. Returns `None` (and the arm 
misses) when the
+   * conf is off, when `useLargeVarTypes` forces the fallback, when the plan 
is not one of the
+   * version-shimmed MapInArrow / MapInPandas operators, or when the child is 
not a Comet
+   * columnar-to-row transition we can strip.
+   */
+  private object EligibleMapInBatch {
+    def unapply(plan: SparkPlan): Option[(MapInBatchInfo, SparkPlan)] = {
+      if (!CometConf.COMET_PYARROW_UDF_ENABLED.get()) {
+        None
+      } else if (arrowUseLargeVarTypes(plan.conf)) {
+        None

Review Comment:
   The rule skips the rewrite entirely when `useLargeVarTypes` is on, because 
`copyVector` does a raw `setBytes` that cannot bridge Comet's 4-byte offsets to 
the destination's 8-byte offsets. This PR already taught `CometPlainVector` to 
read both offset widths, so the boundary is half-solved: the read side handles 
both, the write side gates the whole optimization off. Rather than gate, can 
the var-width arm of `copyVector` widen the offsets when the destination is a 
`BaseLargeVariableWidthVector` and the source is a `BaseVariableWidthVector` 
(read each 4-byte offset, write the 8-byte equivalent, copy the data buffer 
as-is)? That lets the rewrite always fire and removes a planner special case 
that would otherwise become dead code the moment the copy understands the wider 
offset.



##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))
+        arrowWriter.start()
+      }
+
+      override def writeNextInputToStream(dataOut: DataOutputStream): Boolean 
= {
+        while (currentGroup == null || !currentGroup.hasNext) {
+          if (!inputIterator.hasNext) {
+            if (arrowWriter == null) {
+              // No input batch was ever produced (e.g. an upstream filter 
removed every row).
+              // Still emit a valid, empty Arrow IPC stream so the Python 
worker's
+              // ArrowStreamReader reads a schema and then sees zero batches, 
instead of failing
+              // on an absent stream ("Invalid IPC stream: negative 
continuation token"). There is
+              // no sample batch, so derive the schema from the Spark input 
schema. The timezone is
+              // irrelevant here because no rows are exchanged.
+              val inner = schema.head.dataType.asInstanceOf[StructType]
+              val childFields = inner.fields.toSeq.map(f =>
+                Utils.toArrowField(f.name, f.dataType, nullable = true, "UTC"))
+              startWriter(childFields, dataOut)
+            }
+            arrowWriter.end()
+            return false
+          }
+          currentGroup = inputIterator.next()
+        }
+
+        val cometBatch = currentGroup.next()
+        val startData = dataOut.size()
+
+        if (arrowWriter == null) {
+          // Build the destination struct root once, sized to the first 
batch's child fields.
+          // mapInArrow/mapInPandas exchange the columns under a single 
non-nullable struct.
+          // Comet's FFI-imported vectors leave the Arrow Field name null, so 
restore the real
+          // column names from the input schema (the worker reads columns by 
name, and shaded
+          // Arrow rejects a null field name). The field types and child 
structure are kept as-is
+          // so copyVector still walks the source and destination trees in 
lockstep.
+          val childNames = 
schema.head.dataType.asInstanceOf[StructType].fieldNames
+          val childFields = (0 until cometBatch.numCols()).map { i =>
+            val vecField =
+              
cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.getField
+            renamed(vecField, childNames(i), forceNullable = true)
+          }
+          startWriter(childFields, dataOut)
+        }
+
+        var i = 0
+        while (i < cometBatch.numCols()) {
+          val src = cometBatch
+            .column(i)
+            .asInstanceOf[CometDecodedVector]
+            .getValueVector
+            .asInstanceOf[FieldVector]
+          val dst = structVec.getChildByOrdinal(i).asInstanceOf[FieldVector]
+          copyVector(src, dst)
+          i += 1
+        }
+        val numRows = cometBatch.numRows()
+        structVec.setValueCount(numRows)
+        // Mark every row of the struct non-null (all-1 validity). The 
validity buffer is freshly
+        // allocated and zero-initialised, so without this Python would see an 
all-null struct.
+        val validityBytes = (numRows + 7) / 8
+        Platform.setMemory(
+          structVec.getValidityBuffer.memoryAddress(),
+          0xff.toByte,
+          validityBytes)
+        writeRoot.setRowCount(numRows)
+        arrowWriter.writeBatch()
+
+        pythonMetrics("pythonDataSent") += dataOut.size() - startData
+        true
+      }
+    }
+  }
+
+  override protected def newReaderIterator(
+      stream: DataInputStream,
+      writer: Writer,
+      startTime: Long,
+      env: SparkEnv,
+      worker: PythonWorker,
+      pid: Option[Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+    new ReaderIterator(stream, writer, startTime, env, worker, pid, 
releasedOrClosed, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdin reader for $pythonExec", 
0, Long.MaxValue)
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var batchLoaded = true
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (reader != null) {
+          reader.close(false)
+        }
+        allocator.close()
+      }
+
+      protected override def read(): ColumnarBatch = {
+        if (writer.exception.isDefined) {
+          throw writer.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              // Re-wrap the (reloaded) field vectors fresh each batch, 
mirroring Comet's
+              // StreamReader, so each ColumnarBatch reflects the current 
buffers.
+              val vectors: Array[ColumnVector] = 
root.getFieldVectors.asScala.map { vector =>
+                CometVector.getVector(vector, null).asInstanceOf[ColumnVector]
+              }.toArray
+              val batch = new ColumnarBatch(vectors)
+              batch.setNumRows(root.getRowCount)
+              pythonMetrics("pythonNumRowsReceived") += root.getRowCount
+              batch
+            } else {
+              reader.close(false)
+              allocator.close()
+              read()
+            }
+          } else {
+            stream.readInt() match {
+              case SpecialLengths.START_ARROW_STREAM =>
+                reader = new ArrowStreamReader(stream, allocator)
+                root = reader.getVectorSchemaRoot()
+                read()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
+              case SpecialLengths.END_OF_DATA_SECTION =>
+                handleEndOfDataSection()
+                null
+            }
+          }
+        } catch handleException
+      }
+    }
+  }
+
+  /**
+   * Rebuild `field` with `name`, preserving its Arrow type and child 
structure. Any nested child
+   * whose name Comet's FFI import left null is given a positional placeholder 
so shaded Arrow can
+   * materialize the struct. Keeping the type and structure intact means the 
destination tree
+   * still mirrors the Comet source tree for [[copyVector]].
+   */
+  private def renamed(field: Field, name: String, forceNullable: Boolean): 
Field = {
+    // A Map's descendants must keep their original nullability: Arrow 
requires the entries struct
+    // (and its key) to be non-nullable, and `MapVector.createVector` rejects 
a nullable entries
+    // struct. Stop forcing nullable once we enter a Map subtree.
+    val childrenForceNullable = forceNullable && 
!field.getType.isInstanceOf[ArrowType.Map]
+    val children = field.getChildren
+    val newChildren =
+      if (children.isEmpty) children
+      else
+        children.asScala.zipWithIndex.map { case (child, idx) =>
+          renamed(
+            child,
+            if (child.getName == null) s"_$idx" else child.getName,

Review Comment:
   A null-named FFI child gets the positional placeholder `_$idx`. A real 
sibling literally named `_0` would collide. It only applies to positional FFI 
fields with null names so it is unlikely, but a one-line comment noting the 
placeholder assumes no real field uses the `_N` form would save the next reader 
the head-scratch.



##########
spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometArrowPythonRunnerBase.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.channels.Channels
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.vector.{BaseFixedWidthVector, 
BaseLargeVariableWidthVector, BaseVariableWidthVector, FieldVector, 
VectorSchemaRoot}
+import org.apache.arrow.vector.complex.{LargeListVector, ListVector, 
StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, PythonRDD, PythonWorker, 
SpecialLengths}
+import org.apache.spark.sql.comet.util.Utils
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.unsafe.Platform
+
+import org.apache.comet.CometArrowAllocator
+import org.apache.comet.vector.{CometDecodedVector, CometVector}
+
+/**
+ * Shared base for Comet's Arrow Python runners (Spark 4.0 / 4.1 / 4.2).
+ *
+ * Unlike a stock `ArrowPythonRunner`, this does not extend Spark's 
`PythonArrowInput` /
+ * `BasicPythonArrowOutput` traits. Those traits expose Spark's Arrow types 
(`VectorSchemaRoot`,
+ * `Schema`) in their members, and the packaged `comet-spark` jar relocates 
`org.apache.arrow` to
+ * `org.apache.comet.shaded.arrow`, so mixing them in produces a class whose 
synthetic Arrow
+ * members no longer match Spark's unshaded trait contract (an 
`AbstractMethodError` at runtime).
+ *
+ * Instead it extends only the Arrow-agnostic `BasePythonRunner` and performs 
the Arrow IPC
+ * exchange itself using Comet's (shaded) Arrow. The Python worker only ever 
sees a standard Arrow
+ * IPC byte stream, which is version-neutral, so nothing crosses the 
shaded/unshaded boundary:
+ *   - Input: each Comet `ColumnarBatch` is copied into a shaded struct root 
and written to the
+ *     worker with a shaded `ArrowStreamWriter`.
+ *   - Output: the worker's Arrow IPC is read with a shaded 
`ArrowStreamReader` straight into
+ *     `CometVector`s, which is exactly what `CometMapInBatchExec` and 
downstream native operators
+ *     consume.
+ *
+ * `BasePythonRunner` has the same shape across Spark 4.0/4.1/4.2; only the 
subclass constructor
+ * arguments and `writeUDF` differ, so those stay in the per-version 
subclasses.
+ */
+private[python] trait CometArrowPythonRunnerBase
+    extends BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] {
+
+  /** Worker configuration written to the Python worker before execution. */
+  protected def workerConf: Map[String, String]
+
+  /** Comet's Python SQL metrics (data sent/received, rows). */
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  /** Version-specific UDF command serialization. */
+  protected def writeUDF(dataOut: DataOutputStream): Unit
+
+  /**
+   * Input schema as Comet hands it to the runner: a single non-nullable 
struct named "struct"
+   * whose children are the user's input columns. Comet's FFI-imported vectors 
carry Arrow
+   * `Field`s with null names (Comet uses positional schema), so these names 
are the source of
+   * truth for the field names written into the IPC stream that the Python 
worker reads by name.
+   */
+  protected def schema: StructType
+
+  override val pythonExec: String =
+    
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(funcs.head.funcs.head.pythonExec)
+
+  override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+  override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
+  override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  override protected def newWriter(
+      env: SparkEnv,
+      worker: PythonWorker,
+      inputIterator: Iterator[Iterator[ColumnarBatch]],
+      partitionIndex: Int,
+      context: TaskContext): Writer = {
+    new Writer(env, worker, inputIterator, partitionIndex, context) {
+
+      private val allocator =
+        CometArrowAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
+      private var currentGroup: Iterator[ColumnarBatch] = _
+      private var arrowWriter: ArrowStreamWriter = _
+      private var writeRoot: VectorSchemaRoot = _
+      private var structVec: StructVector = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (writeRoot != null) {
+          writeRoot.close()
+        }
+        allocator.close()
+      }
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        // handleMetadataBeforeExec: write the worker config as key/value 
string pairs.
+        dataOut.writeInt(workerConf.size)
+        for ((k, v) <- workerConf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        writeUDF(dataOut)
+      }
+
+      /** Build the destination struct root and start the writer from the 
given child fields. */
+      private def startWriter(childFields: Seq[Field], dataOut: 
DataOutputStream): Unit = {
+        val structField =
+          new Field(
+            "struct",
+            new FieldType(false, ArrowType.Struct.INSTANCE, null),
+            childFields.asJava)
+        structVec = 
structField.createVector(allocator).asInstanceOf[StructVector]
+        writeRoot = new VectorSchemaRoot(Seq[FieldVector](structVec).asJava)
+        arrowWriter = new ArrowStreamWriter(writeRoot, null, 
Channels.newChannel(dataOut))
+        arrowWriter.start()
+      }
+
+      override def writeNextInputToStream(dataOut: DataOutputStream): Boolean 
= {
+        while (currentGroup == null || !currentGroup.hasNext) {
+          if (!inputIterator.hasNext) {
+            if (arrowWriter == null) {
+              // No input batch was ever produced (e.g. an upstream filter 
removed every row).
+              // Still emit a valid, empty Arrow IPC stream so the Python 
worker's
+              // ArrowStreamReader reads a schema and then sees zero batches, 
instead of failing
+              // on an absent stream ("Invalid IPC stream: negative 
continuation token"). There is
+              // no sample batch, so derive the schema from the Spark input 
schema. The timezone is
+              // irrelevant here because no rows are exchanged.
+              val inner = schema.head.dataType.asInstanceOf[StructType]

Review Comment:
   `schema.head.dataType.asInstanceOf[StructType]` is cast in two mutually 
exclusive arms. Hoist it to a single `lazy val` on the Writer so the cast lives 
in one place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to