This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f50507d233 [core] Clean up invalid branch cache and not cache system 
table in caching catalog (#4681)
f50507d233 is described below

commit f50507d233adc5aa5472dc1fc220bb93177db00d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Dec 12 13:22:43 2024 +0800

    [core] Clean up invalid branch cache and not cache system table in caching 
catalog (#4681)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  8 +-
 .../org/apache/paimon/catalog/CachingCatalog.java  | 40 ++++------
 .../java/org/apache/paimon/catalog/Identifier.java | 16 ++++
 .../apache/paimon/catalog/CachingCatalogTest.java  | 92 +++++++++-------------
 .../flink/procedure/DeleteBranchProcedure.java     |  5 +-
 .../apache/paimon/spark/DataFrameWriteTest.scala   |  5 ++
 .../org/apache/paimon/spark/PaimonSinkTest.scala   |  5 ++
 .../apache/paimon/spark/SparkGenericCatalog.java   |  2 +-
 .../spark/procedure/DeleteBranchProcedure.java     | 23 ++++--
 .../org/apache/paimon/spark/PaimonSinkTest.scala   |  5 ++
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |  1 -
 .../paimon/spark/sql/DataFrameWriteTest.scala      |  6 ++
 12 files changed, 112 insertions(+), 96 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 2b277a29b8..b56fec279a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -381,7 +381,7 @@ public abstract class AbstractCatalog implements Catalog {
                 throw new TableNotExistException(identifier);
             }
             return table;
-        } else if (isSpecifiedSystemTable(identifier)) {
+        } else if (identifier.isSystemTable()) {
             Table originTable =
                     getDataOrFormatTable(
                             new Identifier(
@@ -519,12 +519,8 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    public static boolean isSpecifiedSystemTable(Identifier identifier) {
-        return identifier.getSystemTableName() != null;
-    }
-
     protected static boolean isTableInSystemDatabase(Identifier identifier) {
-        return isSystemDatabase(identifier.getDatabaseName()) || 
isSpecifiedSystemTable(identifier);
+        return isSystemDatabase(identifier.getDatabaseName()) || 
identifier.isSystemTable();
     }
 
     protected static void checkNotSystemTable(Identifier identifier, String 
method) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index f67f19700d..e92a589d41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -26,7 +26,6 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -48,7 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
 import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
@@ -56,7 +54,7 @@ import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
 import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
-import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** A {@link Catalog} to cache databases and tables and manifests. */
 public class CachingCatalog extends DelegateCatalog {
@@ -203,6 +201,9 @@ public class CachingCatalog extends DelegateCatalog {
             throws TableNotExistException {
         super.dropTable(identifier, ignoreIfNotExists);
         invalidateTable(identifier);
+        if (identifier.isMainTable()) {
+            invalidateAttachedTables(identifier);
+        }
     }
 
     @Override
@@ -227,26 +228,23 @@ public class CachingCatalog extends DelegateCatalog {
             return table;
         }
 
-        if (isSpecifiedSystemTable(identifier)) {
+        // For system table, do not cache it directly. Instead, cache the 
origin table and then wrap
+        // it to generate the system table.
+        if (identifier.isSystemTable()) {
             Identifier originIdentifier =
                     new Identifier(
                             identifier.getDatabaseName(),
                             identifier.getTableName(),
                             identifier.getBranchName(),
                             null);
-            Table originTable = tableCache.getIfPresent(originIdentifier);
-            if (originTable == null) {
-                originTable = wrapped.getTable(originIdentifier);
-                putTableCache(originIdentifier, originTable);
-            }
+            Table originTable = getTable(originIdentifier);
             table =
                     SystemTableLoader.load(
-                            
Preconditions.checkNotNull(identifier.getSystemTableName()),
+                            checkNotNull(identifier.getSystemTableName()),
                             (FileStoreTable) originTable);
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
-            putTableCache(identifier, table);
             return table;
         }
 
@@ -309,7 +307,7 @@ public class CachingCatalog extends DelegateCatalog {
         public void onRemoval(Identifier identifier, Table table, @NonNull 
RemovalCause cause) {
             LOG.debug("Evicted {} from the table cache ({})", identifier, 
cause);
             if (RemovalCause.EXPIRED.equals(cause)) {
-                tryInvalidateSysTables(identifier);
+                // ignore now
             }
         }
     }
@@ -317,24 +315,18 @@ public class CachingCatalog extends DelegateCatalog {
     @Override
     public void invalidateTable(Identifier identifier) {
         tableCache.invalidate(identifier);
-        tryInvalidateSysTables(identifier);
         if (partitionCache != null) {
             partitionCache.invalidate(identifier);
         }
     }
 
-    private void tryInvalidateSysTables(Identifier identifier) {
-        if (!isSpecifiedSystemTable(identifier)) {
-            tableCache.invalidateAll(allSystemTables(identifier));
-        }
-    }
-
-    private static Iterable<Identifier> allSystemTables(Identifier ident) {
-        List<Identifier> tables = new ArrayList<>();
-        for (String type : SYSTEM_TABLES) {
-            tables.add(Identifier.fromString(ident.getFullName() + 
SYSTEM_TABLE_SPLITTER + type));
+    /** invalidate attached tables, such as cached branches. */
+    private void invalidateAttachedTables(Identifier identifier) {
+        for (@NonNull Identifier i : tableCache.asMap().keySet()) {
+            if (identifier.getTableName().equals(i.getTableName())) {
+                tableCache.invalidate(i);
+            }
         }
-        return tables;
     }
 
     // ================================== refresh 
================================================
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
index 72da69b67b..6cca6824e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
@@ -65,6 +65,10 @@ public class Identifier implements Serializable {
         this.object = object;
     }
 
+    public Identifier(String database, String table, @Nullable String branch) {
+        this(database, table, branch, null);
+    }
+
     public Identifier(
             String database, String table, @Nullable String branch, @Nullable 
String systemTable) {
         this.database = database;
@@ -119,6 +123,18 @@ public class Identifier implements Serializable {
         return systemTable;
     }
 
+    public boolean isMainTable() {
+        return getBranchName() == null && getSystemTableName() == null;
+    }
+
+    public boolean isBranch() {
+        return getBranchName() != null && getSystemTableName() == null;
+    }
+
+    public boolean isSystemTable() {
+        return getSystemTableName() != null;
+    }
+
     private void splitObjectName() {
         if (table != null) {
             return;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 7567f682ae..4792e33c93 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -48,10 +48,8 @@ import org.junit.jupiter.api.Test;
 import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -101,14 +99,49 @@ class CachingCatalogTest extends CatalogTestBase {
 
     @Test
     public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception 
{
-        Catalog catalog = new CachingCatalog(this.catalog);
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, 
false);
         Identifier sysIdent = new Identifier("db", "tbl$files");
+        // get system table will only cache the origin table
         catalog.getTable(sysIdent);
+        assertThat(catalog.tableCache.asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent);
+        // test case sensitivity
+        Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS");
+        catalog.getTable(sysIdent1);
+        assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1);
+
         catalog.dropTable(tableIdent, false);
+        assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent);
         assertThatThrownBy(() -> catalog.getTable(sysIdent))
                 .hasMessage("Table db.tbl does not exist.");
+        assertThatThrownBy(() -> catalog.getTable(sysIdent1))
+                .hasMessage("Table db.tbl does not exist.");
+    }
+
+    @Test
+    public void testInvalidateBranchIfBaseTableIsDropped() throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, 
false);
+        catalog.getTable(tableIdent).createBranch("b1");
+
+        Identifier branchIdent = new Identifier("db", "tbl$branch_b1");
+        Identifier branchSysIdent = new Identifier("db", 
"tbl$branch_b1$FILES");
+        // get system table will only cache the origin table
+        catalog.getTable(branchSysIdent);
+        assertThat(catalog.tableCache.asMap()).containsKey(branchIdent);
+        
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent);
+
+        catalog.dropTable(tableIdent, false);
+        assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent);
+        assertThatThrownBy(() -> catalog.getTable(branchIdent))
+                .hasMessage("Table db.tbl$branch_b1 does not exist.");
+        assertThatThrownBy(() -> catalog.getTable(branchSysIdent))
+                .hasMessage("Table db.tbl$branch_b1 does not exist.");
     }
 
     @Test
@@ -175,59 +208,6 @@ class CachingCatalogTest extends CatalogTestBase {
         
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
     }
 
-    @Test
-    public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
-        TestableCachingCatalog catalog =
-                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
-
-        Identifier tableIdent = new Identifier("db", "tbl");
-        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
-        Table table = catalog.getTable(tableIdent);
-        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
-        assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
-
-        ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
-        
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
-
-        for (Identifier sysTable : sysTables(tableIdent)) {
-            catalog.getTable(sysTable);
-        }
-        
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
-        assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
-                .isNotEmpty()
-                .allMatch(age -> age.isPresent() && 
age.get().equals(Duration.ZERO));
-
-        assertThat(catalog.remainingAgeFor(tableIdent))
-                .as("Loading a non-cached sys table should refresh the main 
table's age")
-                .isEqualTo(Optional.of(EXPIRATION_TTL));
-
-        // Move time forward and access already cached sys tables.
-        ticker.advance(HALF_OF_EXPIRATION);
-        for (Identifier sysTable : sysTables(tableIdent)) {
-            catalog.getTable(sysTable);
-        }
-        assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
-                .isNotEmpty()
-                .allMatch(age -> age.isPresent() && 
age.get().equals(Duration.ZERO));
-
-        assertThat(catalog.remainingAgeFor(tableIdent))
-                .as("Accessing a cached sys table should not affect the main 
table's age")
-                .isEqualTo(Optional.of(HALF_OF_EXPIRATION));
-
-        // Move time forward so the data table drops.
-        ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
-
-        Arrays.stream(sysTables(tableIdent))
-                .forEach(
-                        sysTable ->
-                                assertThat(catalog.tableCache().asMap())
-                                        .as(
-                                                "When a data table expires, 
its sys tables should expire regardless of age")
-                                        .doesNotContainKeys(sysTable));
-    }
-
     @Test
     public void testPartitionCache() throws Exception {
         TestableCachingCatalog catalog =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
index c95fd62bee..56c6490286 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -49,7 +49,10 @@ public class DeleteBranchProcedure extends ProcedureBase {
             })
     public String[] call(ProcedureContext procedureContext, String tableId, 
String branchStr)
             throws Catalog.TableNotExistException {
-        
catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
+        Identifier identifier = Identifier.fromString(tableId);
+        catalog.getTable(identifier).deleteBranches(branchStr);
+        catalog.invalidateTable(
+                new Identifier(identifier.getDatabaseName(), 
identifier.getTableName(), branchStr));
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
index a3cecfc72e..cb449edb4c 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
@@ -18,10 +18,15 @@
 
 package org.apache.paimon.spark
 
+import org.apache.spark.SparkConf
 import org.junit.jupiter.api.Assertions
 
 class DataFrameWriteTest extends PaimonSparkTestBase {
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
   test("Paimon: DataFrameWrite.saveAsTable") {
 
     import testImplicits._
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 18fb9e116b..ab4a9bcd9d 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions.{col, mean, window}
@@ -27,6 +28,10 @@ import java.sql.Date
 
 class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
   import testImplicits._
 
   test("Paimon Sink: forEachBatch") {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 9957f0cdf9..63d75a53ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -186,7 +186,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
     @Override
     public void invalidateTable(Identifier ident) {
         // We do not need to check whether the table exists and whether
-        // it is an Paimon table to reduce remote service requests.
+        // it is a Paimon table to reduce remote service requests.
         sparkCatalog.invalidateTable(ident);
         asTableCatalog().invalidateTable(ident);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
index e398eee026..4a01c33d6a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -61,13 +63,20 @@ public class DeleteBranchProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String branchStr = args.getString(1);
-        return modifyPaimonTable(
-                tableIdent,
-                table -> {
-                    table.deleteBranches(branchStr);
-                    InternalRow outputRow = newInternalRow(true);
-                    return new InternalRow[] {outputRow};
-                });
+        InternalRow[] result =
+                modifyPaimonTable(
+                        tableIdent,
+                        table -> {
+                            table.deleteBranches(branchStr);
+                            InternalRow outputRow = newInternalRow(true);
+                            return new InternalRow[] {outputRow};
+                        });
+        ((WithPaimonCatalog) tableCatalog())
+                .paimonCatalog()
+                .invalidateTable(
+                        new org.apache.paimon.catalog.Identifier(
+                                tableIdent.namespace()[0], tableIdent.name(), 
branchStr));
+        return result;
     }
 
     public static ProcedureBuilder builder() {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 63203122ac..61bf552494 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions.{col, mean, window}
@@ -27,6 +28,10 @@ import java.sql.Date
 
 class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
   import testImplicits._
 
   test("Paimon Sink: forEachBatch") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 605b2e6ca5..867b3e5e33 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -66,7 +66,6 @@ class PaimonSparkTestBase
     super.sparkConf
       .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
       .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
-      .set("spark.sql.catalog.paimon.cache-enabled", "false")
       .set("spark.sql.extensions", 
classOf[PaimonSparkSessionExtensions].getName)
       .set("spark.serializer", serializer)
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index a0a94afacf..edd092c85c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql
 
 import org.apache.paimon.spark.PaimonSparkTestBase
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.DecimalType
 import org.junit.jupiter.api.Assertions
@@ -27,6 +28,11 @@ import org.junit.jupiter.api.Assertions
 import java.sql.{Date, Timestamp}
 
 class DataFrameWriteTest extends PaimonSparkTestBase {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
   import testImplicits._
 
   test("Paimon: DataFrameWrite.saveAsTable") {

Reply via email to