This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0ce105f454 [core] Fix the ArrayIndexOutOfBoundsException when use data
evolution with avro (#7013)
0ce105f454 is described below
commit 0ce105f45440058fb11cad998339ca4aba98563f
Author: WenjunMin <[email protected]>
AuthorDate: Tue Jan 13 08:38:31 2026 +0800
[core] Fix the ArrayIndexOutOfBoundsException when use data evolution with
avro (#7013)
---
.../paimon/operation/DataEvolutionSplitRead.java | 4 +-
.../paimon/spark/sql/RowTrackingTestBase.scala | 48 +++++++++++++---------
2 files changed, 30 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index fa04dbad59..81f74fe020 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -377,7 +377,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
readerSuppliers.add(
() ->
new DataFileRecordReader(
- schema.logicalRowType(),
+ readRowType,
formatReaderMapping.getReaderFactory(),
formatReaderContext,
formatReaderMapping.getIndexMapping(),
@@ -404,7 +404,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
new FormatReaderContext(
fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), selection);
return new DataFileRecordReader(
- schema.logicalRowType(),
+ readRowType,
formatReaderMapping.getReaderFactory(),
formatReaderContext,
formatReaderMapping.getIndexMapping(),
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index e0bda544ab..12639e6ef3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -18,7 +18,9 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.CoreOptions
import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.format.FileFormat
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
@@ -311,26 +313,32 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
test("Data Evolution: merge into table with data-evolution") {
- withTable("s", "t") {
- sql("CREATE TABLE s (id INT, b INT)")
- sql("INSERT INTO s VALUES (1, 11), (2, 22)")
-
- sql(
- "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
-
- sql("""
- |MERGE INTO t
- |USING s
- |ON t.id = s.id
- |WHEN MATCHED THEN UPDATE SET t.b = s.b
- |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
- |""".stripMargin)
- checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
- )
+ Seq("parquet", "avro").foreach {
+ format =>
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT)")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+ sql(s"""CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
+ |('row-tracking.enabled' = 'true',
+ |'data-evolution.enabled' = 'true',
+ |'file.format' = '$format'
+ |)""".stripMargin)
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.b = s.b
+ |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
+ )
+ }
}
}