This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f0a70111a Enable enhanced tests for spark 4.0 & fix failures (#11868)
0f0a70111a is described below

commit 0f0a70111a5a443baf3ffbe347632e64080e9b38
Author: inf <[email protected]>
AuthorDate: Mon Apr 13 11:49:22 2026 +0000

    Enable enhanced tests for spark 4.0 & fix failures (#11868)
    
    Co-authored-by: Yuan <[email protected]>
    
    Co-authored-by: infvg <[email protected]>
---
 .github/workflows/velox_backend_enhanced.yml       |  2 +-
 .../write/IcebergColumnarBatchDataWriter.scala     |  4 ++
 .../execution/AbstractIcebergWriteExec.scala       | 11 ++++-
 .../execution/enhanced/VeloxIcebergSuite.scala     | 55 ++++++++++++++++++++++
 cpp/velox/compute/iceberg/IcebergWriter.cc         | 23 +++++++--
 5 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/velox_backend_enhanced.yml 
b/.github/workflows/velox_backend_enhanced.yml
index 183cd7a54a..f6f0f85881 100644
--- a/.github/workflows/velox_backend_enhanced.yml
+++ b/.github/workflows/velox_backend_enhanced.yml
@@ -298,7 +298,7 @@ jobs:
           java -version
           $MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 
-Pbackends-velox -Piceberg \
           -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
-          
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
+          
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest
       - name: Upload test report
         if: always()
         uses: actions/upload-artifact@v4
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
index c4f202d84b..9cdeadd215 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
@@ -44,6 +44,10 @@ case class IcebergColumnarBatchDataWriter(
   }
 
   override def write(batch: ColumnarBatch): Unit = {
+    // Pass the original batch to native code
+    // The native code will use the schema (writeSchema) we provided during 
initialization
+    // to determine which columns to write, effectively filtering out metadata 
columns
+    // like __row_operation, _file, _pos that Spark 4.0 adds
     val batchHandle = 
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
     jniWrapper.write(writer, batchHandle)
   }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index 12ed90bdd6..9556593594 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -24,14 +24,23 @@ import org.apache.spark.sql.types.StructType
 import org.apache.iceberg.spark.source.IcebergWriteUtil
 import org.apache.iceberg.types.TypeUtil
 
+import scala.collection.JavaConverters._
+
 abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
 
   // the writer factory works for both batch and streaming
   private def createIcebergDataWriteFactory(schema: StructType): 
IcebergDataWriteFactory = {
     val writeSchema = IcebergWriteUtil.getWriteSchema(write)
     val nestedField = TypeUtil.visit(writeSchema, new 
IcebergNestedFieldVisitor)
+    // Filter out metadata columns from the Spark output schema and reorder to 
match Iceberg schema
+    // Spark 4.0 may include metadata columns in the output schema during 
UPDATE operations,
+    // but these should not be written to the Iceberg table
+    val writeFieldNames = writeSchema.columns().asScala.map(_.name()).toSet
+    val filteredSchema = StructType(
+      schema.fields.filter(field => writeFieldNames.contains(field.name))
+    )
     IcebergDataWriteFactory(
-      schema,
+      filteredSchema,
       getFileFormat(IcebergWriteUtil.getFileFormat(write)),
       IcebergWriteUtil.getDirectory(write),
       getCodec,
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index a51f2d7717..e2df119bc0 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -410,4 +410,59 @@ class VeloxIcebergSuite extends IcebergSuite {
           _.values.exists(_.contains("Not support write table with sort 
order"))))
     }
   }
+
+  test("iceberg read cow table - update after schema evolution") {
+    withTable("iceberg_cow_update_evolved_tb") {
+      spark.sql("""
+                  |create table iceberg_cow_update_evolved_tb (
+                  |  id int,
+                  |  name string,
+                  |  age int
+                  |) using iceberg
+                  |tblproperties (
+                  |  'format-version' = '2',
+                  |  'write.delete.mode' = 'copy-on-write',
+                  |  'write.update.mode' = 'copy-on-write',
+                  |  'write.merge.mode' = 'copy-on-write'
+                  |)
+                  |""".stripMargin)
+
+      spark.sql("""
+                  |alter table iceberg_cow_update_evolved_tb
+                  |add columns (salary decimal(10, 2))
+                  |""".stripMargin)
+
+      spark.sql("""
+                  |insert into table iceberg_cow_update_evolved_tb values
+                  |  (1, 'Name1', 23, 3400.00),
+                  |  (2, 'Name2', 30, 5500.00),
+                  |  (3, 'Name3', 35, 6500.00)
+                  |""".stripMargin)
+
+      val df = spark.sql("""
+                           |update iceberg_cow_update_evolved_tb
+                           |set name = 'Name4'
+                           |where id = 1
+                           |""".stripMargin)
+
+      assert(
+        df.queryExecution.executedPlan
+          .asInstanceOf[CommandResultExec]
+          .commandPhysicalPlan
+          .isInstanceOf[VeloxIcebergReplaceDataExec])
+
+      checkAnswer(
+        spark.sql("""
+                    |select id, name, age, salary
+                    |from iceberg_cow_update_evolved_tb
+                    |order by id
+                    |""".stripMargin),
+        Seq(
+          Row(1, "Name4", 23, new java.math.BigDecimal("3400.00")),
+          Row(2, "Name2", 30, new java.math.BigDecimal("5500.00")),
+          Row(3, "Name1", 35, new java.math.BigDecimal("6500.00"))
+        )
+      )
+    }
+  }
 }
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc 
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 1d6e7fa344..ebbd89cd13 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -154,10 +154,10 @@ std::shared_ptr<IcebergInsertTableHandle> 
createIcebergInsertTableHandle(
               nestedField.children[i]));
     }
   }
-  
+
   auto fileNameGenerator = std::make_shared<const 
GlutenIcebergFileNameGenerator>(
       partitionId, taskId, operationId, fileFormat);
-  
+
   std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
       std::make_shared<connector::hive::LocationHandle>(
           outputDirectoryPath, outputDirectoryPath, 
connector::hive::LocationHandle::TableType::kExisting);
@@ -212,7 +212,24 @@ IcebergWriter::IcebergWriter(
 }
 
 void IcebergWriter::write(const VeloxColumnarBatch& batch) {
-  dataSink_->appendData(batch.getRowVector());
+  auto inputRowVector = batch.getRowVector();
+  auto inputRowType = asRowType(inputRowVector->type());
+
+  if (inputRowType->size() != rowType_->size()) {
+    const auto& children = inputRowVector->children();
+    std::vector<VectorPtr> dataColumns(children.begin() + 1, children.begin() 
+ 1 + rowType_->size());
+
+    auto filteredRowVector = std::make_shared<RowVector>(
+        pool_.get(),
+        rowType_,
+        inputRowVector->nulls(),
+        inputRowVector->size(),
+        std::move(dataColumns));
+
+    dataSink_->appendData(filteredRowVector);
+  } else {
+    dataSink_->appendData(inputRowVector);
+  }
 }
 
 std::vector<std::string> IcebergWriter::commit() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to