This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8714be2653 [spark] Add test: Data Evolution: concurrent merge and 
compact (#7120)
8714be2653 is described below

commit 8714be2653ad7d472d0c38c9fb8dfa29570d5102
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 26 17:50:33 2026 +0800

    [spark] Add test: Data Evolution: concurrent merge and compact (#7120)
---
 .../spark/write/DataEvolutionTableDataWrite.scala  |  6 ++-
 .../paimon/spark/sql/RowTrackingTestBase.scala     | 53 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 2 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
index 8b33cb0660..0bc9bc0a76 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
@@ -67,13 +67,15 @@ case class DataEvolutionTableDataWrite(
 
   private def newCurrentWriter(firstRowId: Long): Unit = {
     finishCurrentWriter()
-    val (partition, numRecords) = 
firstRowIdToPartitionMap.getOrElse(firstRowId, null)
-    if (partition == null) {
+    val pair = firstRowIdToPartitionMap.getOrElse(firstRowId, null)
+    if (pair == null) {
       throw new IllegalArgumentException(
         s"First row ID $firstRowId not found in partition map. " +
           s"Available first row IDs: 
${firstRowIdToPartitionMap.keys.mkString(", ")}")
     }
 
+    val (partition, numRecords) = pair
+
     val writer = writeBuilder
       .newWrite()
       .withWriteType(writeType)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 83dedc96dd..2f6afa7ce7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql
 
 import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join, 
LogicalPlan, MergeRows, RepartitionByExpression, Sort}
@@ -29,11 +30,63 @@ import org.apache.spark.sql.util.QueryExecutionListener
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
 
 abstract class RowTrackingTestBase extends PaimonSparkTestBase {
 
   import testImplicits._
 
+  ignore("Data Evolution: concurrent merge and compact") {
+    withTable("s", "t") {
+      sql(s"""
+            CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+                 'row-tracking.enabled' = 'true',
+                 'compaction.min.file-num' = '2',
+                 'data-evolution.enabled' = 'true')
+          """)
+      sql("INSERT INTO t VALUES (1, 0, 0)")
+      Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+      val mergeInto = Future {
+        for (_ <- 1 to 10) {
+          sql(s"""
+                 |MERGE INTO t
+                 |USING s
+                 |ON t.id = s.id
+                 |WHEN MATCHED THEN
+                 |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+                 |""".stripMargin).collect()
+        }
+      }
+
+      val t = loadTable("t")
+
+      def canBeCompacted: Boolean = {
+        val split = 
t.newSnapshotReader().read().splits().get(0).asInstanceOf[DataSplit]
+        split.dataFiles().size() > 1
+      }
+
+      val compact = Future {
+        for (_ <- 1 to 10) {
+          while (!canBeCompacted) {
+            Thread.sleep(1)
+          }
+          sql("CALL sys.compact(table => 't')")
+          val snapshot = t.latestSnapshot().get()
+          assert(snapshot.totalRecordCount > 0)
+          assert(snapshot.totalRecordCount < 12)
+        }
+      }
+
+      Await.result(mergeInto, 60.seconds)
+      Await.result(compact, 60.seconds)
+
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10)))
+    }
+  }
+
   test("Row Tracking: read row Tracking") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")

Reply via email to