This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0f0a70111a Enable enhanced tests for spark 4.0 & fix failures (#11868)
0f0a70111a is described below
commit 0f0a70111a5a443baf3ffbe347632e64080e9b38
Author: inf <[email protected]>
AuthorDate: Mon Apr 13 11:49:22 2026 +0000
Enable enhanced tests for spark 4.0 & fix failures (#11868)
Co-authored-by: Yuan <[email protected]>
Co-authored-by: infvg <[email protected]>
---
.github/workflows/velox_backend_enhanced.yml | 2 +-
.../write/IcebergColumnarBatchDataWriter.scala | 4 ++
.../execution/AbstractIcebergWriteExec.scala | 11 ++++-
.../execution/enhanced/VeloxIcebergSuite.scala | 55 ++++++++++++++++++++++
cpp/velox/compute/iceberg/IcebergWriter.cc | 23 +++++++--
5 files changed, 90 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/velox_backend_enhanced.yml
b/.github/workflows/velox_backend_enhanced.yml
index 183cd7a54a..f6f0f85881 100644
--- a/.github/workflows/velox_backend_enhanced.yml
+++ b/.github/workflows/velox_backend_enhanced.yml
@@ -298,7 +298,7 @@ jobs:
java -version
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17
-Pbackends-velox -Piceberg \
-Pspark-ut
-DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
-
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
+
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
uses: actions/upload-artifact@v4
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
index c4f202d84b..9cdeadd215 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
@@ -44,6 +44,10 @@ case class IcebergColumnarBatchDataWriter(
}
override def write(batch: ColumnarBatch): Unit = {
+ // Pass the original batch to native code
+ // The native code will use the schema (writeSchema) we provided during
initialization
+ // to determine which columns to write, effectively filtering out metadata
columns
+ // like __row_operation, _file, _pos that Spark 4.0 adds
val batchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
jniWrapper.write(writer, batchHandle)
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index 12ed90bdd6..9556593594 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -24,14 +24,23 @@ import org.apache.spark.sql.types.StructType
import org.apache.iceberg.spark.source.IcebergWriteUtil
import org.apache.iceberg.types.TypeUtil
+import scala.collection.JavaConverters._
+
abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
// the writer factory works for both batch and streaming
private def createIcebergDataWriteFactory(schema: StructType):
IcebergDataWriteFactory = {
val writeSchema = IcebergWriteUtil.getWriteSchema(write)
val nestedField = TypeUtil.visit(writeSchema, new
IcebergNestedFieldVisitor)
+ // Filter out metadata columns from the Spark output schema and reorder to
match Iceberg schema
+ // Spark 4.0 may include metadata columns in the output schema during
UPDATE operations,
+ // but these should not be written to the Iceberg table
+ val writeFieldNames = writeSchema.columns().asScala.map(_.name()).toSet
+ val filteredSchema = StructType(
+ schema.fields.filter(field => writeFieldNames.contains(field.name))
+ )
IcebergDataWriteFactory(
- schema,
+ filteredSchema,
getFileFormat(IcebergWriteUtil.getFileFormat(write)),
IcebergWriteUtil.getDirectory(write),
getCodec,
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index a51f2d7717..e2df119bc0 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -410,4 +410,59 @@ class VeloxIcebergSuite extends IcebergSuite {
_.values.exists(_.contains("Not support write table with sort
order"))))
}
}
+
+ test("iceberg read cow table - update after schema evolution") {
+ withTable("iceberg_cow_update_evolved_tb") {
+ spark.sql("""
+ |create table iceberg_cow_update_evolved_tb (
+ | id int,
+ | name string,
+ | age int
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'copy-on-write',
+ | 'write.update.mode' = 'copy-on-write',
+ | 'write.merge.mode' = 'copy-on-write'
+ |)
+ |""".stripMargin)
+
+ spark.sql("""
+ |alter table iceberg_cow_update_evolved_tb
+ |add columns (salary decimal(10, 2))
+ |""".stripMargin)
+
+ spark.sql("""
+ |insert into table iceberg_cow_update_evolved_tb values
+ | (1, 'Name1', 23, 3400.00),
+ | (2, 'Name2', 30, 5500.00),
+ | (3, 'Name3', 35, 6500.00)
+ |""".stripMargin)
+
+ val df = spark.sql("""
+ |update iceberg_cow_update_evolved_tb
+ |set name = 'Name4'
+ |where id = 1
+ |""".stripMargin)
+
+ assert(
+ df.queryExecution.executedPlan
+ .asInstanceOf[CommandResultExec]
+ .commandPhysicalPlan
+ .isInstanceOf[VeloxIcebergReplaceDataExec])
+
+ checkAnswer(
+ spark.sql("""
+ |select id, name, age, salary
+ |from iceberg_cow_update_evolved_tb
+ |order by id
+ |""".stripMargin),
+ Seq(
+ Row(1, "Name4", 23, new java.math.BigDecimal("3400.00")),
+ Row(2, "Name2", 30, new java.math.BigDecimal("5500.00")),
+ Row(3, "Name1", 35, new java.math.BigDecimal("6500.00"))
+ )
+ )
+ }
+ }
}
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 1d6e7fa344..ebbd89cd13 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -154,10 +154,10 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
nestedField.children[i]));
}
}
-
+
auto fileNameGenerator = std::make_shared<const
GlutenIcebergFileNameGenerator>(
partitionId, taskId, operationId, fileFormat);
-
+
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath,
connector::hive::LocationHandle::TableType::kExisting);
@@ -212,7 +212,24 @@ IcebergWriter::IcebergWriter(
}
void IcebergWriter::write(const VeloxColumnarBatch& batch) {
- dataSink_->appendData(batch.getRowVector());
+ auto inputRowVector = batch.getRowVector();
+ auto inputRowType = asRowType(inputRowVector->type());
+
+ if (inputRowType->size() != rowType_->size()) {
+ const auto& children = inputRowVector->children();
+ std::vector<VectorPtr> dataColumns(children.begin() + 1, children.begin()
+ 1 + rowType_->size());
+
+ auto filteredRowVector = std::make_shared<RowVector>(
+ pool_.get(),
+ rowType_,
+ inputRowVector->nulls(),
+ inputRowVector->size(),
+ std::move(dataColumns));
+
+ dataSink_->appendData(filteredRowVector);
+ } else {
+ dataSink_->appendData(inputRowVector);
+ }
}
std::vector<std::string> IcebergWriter::commit() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]