This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 55e4d28c70 [spark] fix Merge Into unstable tests (#6912)
55e4d28c70 is described below
commit 55e4d28c705a2c8a3df8ac513e06abd81bb72856
Author: Weitai Li <[email protected]>
AuthorDate: Sun Dec 28 20:44:54 2025 +0800
[spark] fix Merge Into unstable tests (#6912)
---
.../paimon/spark/sql/RowTrackingTestBase.scala | 50 ++++++++++++++--------
1 file changed, 32 insertions(+), 18 deletions(-)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 137bbe76d7..204ca7fcd4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -22,12 +22,11 @@ import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan,
RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join,
LogicalPlan, MergeRows, RepartitionByExpression, Sort}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import java.util.concurrent.{CountDownLatch, TimeUnit}
abstract class RowTrackingTestBase extends PaimonSparkTestBase {
@@ -397,13 +396,20 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql(
"INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
- val capturedPlans: mutable.ListBuffer[LogicalPlan] =
mutable.ListBuffer.empty
+ var findSplitsPlan: LogicalPlan = null
+ val latch = new CountDownLatch(1)
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
- capturedPlans += qe.analyzed
+ if (qe.analyzed.collectFirst { case _: Deduplicate => true
}.nonEmpty) {
+ latch.countDown()
+ findSplitsPlan = qe.analyzed
+ }
}
override def onFailure(funcName: String, qe: QueryExecution,
exception: Exception): Unit = {
- capturedPlans += qe.analyzed
+ if (qe.analyzed.collectFirst { case _: Deduplicate => true
}.nonEmpty) {
+ latch.countDown()
+ findSplitsPlan = qe.analyzed
+ }
}
}
spark.listenerManager.register(listener)
@@ -416,9 +422,10 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
|WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES
(target_ROW_ID, b * 1.1, c)
|WHEN NOT MATCHED THEN INSERT (a, b, c) VALUES (target_ROW_ID, b,
c)
|""".stripMargin)
+ assert(latch.await(10, TimeUnit.SECONDS), "await timeout")
// Assert that no Join operator was used during
//
`org.apache.paimon.spark.commands.MergeIntoPaimonDataEvolutionTable.targetRelatedSplits`
- assert(capturedPlans.head.collect { case plan: Join => plan }.isEmpty)
+ assert(findSplitsPlan != null && findSplitsPlan.collect { case plan:
Join => plan }.isEmpty)
spark.listenerManager.unregister(listener)
checkAnswer(
@@ -442,13 +449,20 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql(
"INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
- val capturedPlans = new
java.util.concurrent.CopyOnWriteArrayList[LogicalPlan]()
+ var updatePlan: LogicalPlan = null
+ val latch = new CountDownLatch(1)
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
- capturedPlans.add(qe.analyzed)
+ if (qe.analyzed.collectFirst { case _: MergeRows => true }.nonEmpty)
{
+ latch.countDown()
+ updatePlan = qe.analyzed
+ }
}
override def onFailure(funcName: String, qe: QueryExecution,
exception: Exception): Unit = {
- capturedPlans.add(qe.analyzed)
+ if (qe.analyzed.collectFirst { case _: MergeRows => true }.nonEmpty)
{
+ latch.countDown()
+ updatePlan = qe.analyzed
+ }
}
}
spark.listenerManager.register(listener)
@@ -460,17 +474,17 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
|WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b *
3,
|c = concat(target.c, source.c)
|""".stripMargin).collect()
+ assert(latch.await(10, TimeUnit.SECONDS), "await timeout")
// Assert no shuffle/join/sort was used in
//
'org.apache.paimon.spark.commands.MergeIntoPaimonDataEvolutionTable.updateActionInvoke'
assert(
- capturedPlans.asScala.forall(
- plan =>
- plan.collectFirst {
- case p: Join => p
- case p: Sort => p
- case p: RepartitionByExpression => p
- }.isEmpty),
- s"Found unexpected Join/Sort/Exchange in plan:\n$capturedPlans"
+ updatePlan != null &&
+ updatePlan.collectFirst {
+ case p: Join => p
+ case p: Sort => p
+ case p: RepartitionByExpression => p
+ }.isEmpty,
+ s"Found unexpected Join/Sort/Exchange in plan: $updatePlan"
)
spark.listenerManager.unregister(listener)