This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6d3f12d7 [test][rest] add test case for two sessions with cache for 
rest commitTable (#6438)
ed6d3f12d7 is described below

commit ed6d3f12d78a39c80b886877aca974bafdd41fe2
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 2 18:57:49 2026 +0800

    [test][rest] add test case for two sessions with cache for rest commitTable 
(#6438)
---
 .../spark/PaimonSparkTestWithRestCatalogBase.scala |  2 +-
 .../spark/PaimonSparkTwoSessionCacheTest.scala     | 85 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
index 659e668709..c880ba7f36 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
@@ -34,7 +34,7 @@ class PaimonSparkTestWithRestCatalogBase extends 
PaimonSparkTestBase {
 
   private var restCatalogServer: RESTCatalogServer = _
   private var serverUrl: String = _
-  private var warehouse: String = _
+  protected var warehouse: String = _
   private val initToken = "init_token"
 
   override protected def beforeAll(): Unit = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTwoSessionCacheTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTwoSessionCacheTest.scala
new file mode 100644
index 0000000000..9dff5efcf7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTwoSessionCacheTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.catalog.Catalog.TableNotExistException
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+/**
+ * Two-session reproduction of stale cache: session1 caches table, session2 
drops & recreates, then
+ * session1 commits with stale tableId and fails.
+ */
+class PaimonSparkTwoSessionCacheTest extends 
PaimonSparkTestWithRestCatalogBase {
+
+  test("Two sessions: stale cache commit fails") {
+    val db = "sku"
+    val tbl = "sku_detail_twosession"
+
+    // s1 and s2 are independent SparkSessions sharing SparkContext but with 
separate Catalogs
+    val s1 = spark
+    val s2 = spark.newSession()
+
+    // Ensure both sessions can see the database
+    s1.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$db")
+    s2.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$db")
+
+    withTable(s"paimon.$db.$tbl") {
+      s1.sql(s"""
+        CREATE TABLE paimon.%s.%s (
+          id INT,
+          name STRING
+        ) USING PAIMON
+      """.format(db, tbl))
+
+      // Session 1 caches table by reading/writing
+      s1.sql(s"INSERT INTO paimon.%s.%s VALUES (1, 'a'), (2, 'b')".format(db, 
tbl))
+      checkAnswer(
+        s1.sql(s"SELECT * FROM paimon.%s.%s ORDER BY id".format(db, tbl)),
+        Seq(Row(1, "a"), Row(2, "b")))
+
+      // Session 2 drops and recreates the table (new tableId on server)
+      s2.sql(s"DROP TABLE IF EXISTS paimon.%s.%s".format(db, tbl))
+      s2.sql(s"""
+        CREATE TABLE paimon.%s.%s (
+          id INT,
+          name STRING
+        ) USING PAIMON
+      """.format(db, tbl))
+      s2.sql(s"INSERT INTO paimon.%s.%s VALUES (3, 'c')".format(db, tbl))
+
+      // Session 1 attempts another write using stale cache (before fix this 
should fail)
+      val thrown = intercept[Exception] {
+        s1.sql(s"INSERT INTO paimon.%s.%s VALUES (4, 'd')".format(db, tbl))
+      }
+      assert(thrown.getMessage.contains("Commit failed"))
+    }
+  }
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.catalog.paimon.cache-enabled", "true")
+      .set("spark.sql.catalog.paimon.cache-expire-after-access", "5m")
+      .set("spark.sql.catalog.paimon.cache-expire-after-write", "10m")
+      .set("spark.sql.catalog.paimon.codegen-enabled", "false")
+      .set("spark.sql.catalog.paimon.plugin-enabled", "false")
+      .set("spark.sql.catalog.paimon.plugin-dir", warehouse)
+  }
+}

Reply via email to