This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8714be2653 [spark] Add test: Data Evolution: concurrent merge and
compact (#7120)
8714be2653 is described below
commit 8714be2653ad7d472d0c38c9fb8dfa29570d5102
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 26 17:50:33 2026 +0800
[spark] Add test: Data Evolution: concurrent merge and compact (#7120)
---
.../spark/write/DataEvolutionTableDataWrite.scala | 6 ++-
.../paimon/spark/sql/RowTrackingTestBase.scala | 53 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
index 8b33cb0660..0bc9bc0a76 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
@@ -67,13 +67,15 @@ case class DataEvolutionTableDataWrite(
private def newCurrentWriter(firstRowId: Long): Unit = {
finishCurrentWriter()
- val (partition, numRecords) =
firstRowIdToPartitionMap.getOrElse(firstRowId, null)
- if (partition == null) {
+ val pair = firstRowIdToPartitionMap.getOrElse(firstRowId, null)
+ if (pair == null) {
throw new IllegalArgumentException(
s"First row ID $firstRowId not found in partition map. " +
s"Available first row IDs:
${firstRowIdToPartitionMap.keys.mkString(", ")}")
}
+ val (partition, numRecords) = pair
+
val writer = writeBuilder
.newWrite()
.withWriteType(writeType)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 83dedc96dd..2f6afa7ce7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join,
LogicalPlan, MergeRows, RepartitionByExpression, Sort}
@@ -29,11 +30,63 @@ import org.apache.spark.sql.util.QueryExecutionListener
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
abstract class RowTrackingTestBase extends PaimonSparkTestBase {
import testImplicits._
+ ignore("Data Evolution: concurrent merge and compact") {
+ withTable("s", "t") {
+ sql(s"""
+ CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'compaction.min.file-num' = '2',
+ 'data-evolution.enabled' = 'true')
+ """)
+ sql("INSERT INTO t VALUES (1, 0, 0)")
+ Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+ val mergeInto = Future {
+ for (_ <- 1 to 10) {
+ sql(s"""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+ |""".stripMargin).collect()
+ }
+ }
+
+ val t = loadTable("t")
+
+ def canBeCompacted: Boolean = {
+ val split =
t.newSnapshotReader().read().splits().get(0).asInstanceOf[DataSplit]
+ split.dataFiles().size() > 1
+ }
+
+ val compact = Future {
+ for (_ <- 1 to 10) {
+ while (!canBeCompacted) {
+ Thread.sleep(1)
+ }
+ sql("CALL sys.compact(table => 't')")
+ val snapshot = t.latestSnapshot().get()
+ assert(snapshot.totalRecordCount > 0)
+ assert(snapshot.totalRecordCount < 12)
+ }
+ }
+
+ Await.result(mergeInto, 60.seconds)
+ Await.result(compact, 60.seconds)
+
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10)))
+ }
+ }
+
test("Row Tracking: read row Tracking") {
withTable("t") {
sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")