This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b868fe3a46 [VFS] Enable table cache in PVFS (#5958)
b868fe3a46 is described below

commit b868fe3a464dc049b77a7a5c3542e1873ced47f9
Author: timmyyao <[email protected]>
AuthorDate: Fri Jul 25 18:43:06 2025 +0800

    [VFS] Enable table cache in PVFS (#5958)
---
 .../org/apache/paimon/fs/local/LocalFileIO.java    |  10 +-
 .../java/org/apache/paimon/vfs/VFSOperations.java  | 119 +++++++++++++++++----
 .../paimon/vfs/hadoop/PaimonVirtualFileSystem.java |  10 +-
 .../MockRestNoCacheVirtualFileSystemTest.java      |  43 ++++++++
 .../vfs/hadoop/MockRestVirtualFileSystemTest.java  |  17 +--
 .../paimon/vfs/hadoop/VirtualFileSystemTest.java   |  10 ++
 6 files changed, 180 insertions(+), 29 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 1e355df609..c6b32b1ae0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -216,7 +216,15 @@ public class LocalFileIO implements FileIO {
         try {
             RENAME_LOCK.lock();
             if (dstFile.exists()) {
-                return false;
+                if (!dstFile.isDirectory()) {
+                    return false;
+                }
+                // Make it compatible with HadoopFileIO: if dst is an existing 
directory,
+                // dst=dst/srcFileName
+                dstFile = new File(dstFile, srcFile.getName());
+                if (dstFile.exists()) {
+                    return false;
+                }
             }
             Files.move(srcFile.toPath(), dstFile.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
             return true;
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
index 1fec65cd07..48865aa05e 100644
--- 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.vfs;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
@@ -34,18 +35,29 @@ import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.schema.Schema;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
 
 /** Wrap over RESTCatalog to provide basic operations for virtual path. */
 public class VFSOperations {
@@ -55,8 +67,38 @@ public class VFSOperations {
     private final RESTApi api;
     private final CatalogContext context;
 
+    @Nullable private Cache<Identifier, VFSTableInfo> tableCache;
+
     public VFSOperations(Options options) {
         this.api = new RESTApi(options);
+
+        if (options.get(CACHE_ENABLED)) {
+            Duration expireAfterAccess = 
options.get(CACHE_EXPIRE_AFTER_ACCESS);
+            if (expireAfterAccess.isZero() || expireAfterAccess.isNegative()) {
+                throw new IllegalArgumentException(
+                        "When 'cache.expire-after-access' is set to negative 
or 0, the catalog cache should be disabled.");
+            }
+            Duration expireAfterWrite = options.get(CACHE_EXPIRE_AFTER_WRITE);
+            if (expireAfterWrite.isZero() || expireAfterWrite.isNegative()) {
+                throw new IllegalArgumentException(
+                        "When 'cache.expire-after-write' is set to negative or 
0, the catalog cache should be disabled.");
+            }
+            LOG.info(
+                    "Initialize virtual file system with table cache enabled, 
expireAfterAccess={}, expireAfterWrite={}",
+                    expireAfterAccess,
+                    expireAfterWrite);
+
+            tableCache =
+                    Caffeine.newBuilder()
+                            .softValues()
+                            .executor(Runnable::run)
+                            .expireAfterAccess(expireAfterAccess)
+                            .expireAfterWrite(expireAfterWrite)
+                            .ticker(Ticker.systemTicker())
+                            .build();
+        } else {
+            LOG.info("Initialize virtual file system with table cache 
disabled");
+        }
         // Get the configured options which has been merged from REST Server
         this.context = CatalogContext.create(api.options());
     }
@@ -79,27 +121,16 @@ public class VFSOperations {
             relativePath = String.join("/", Arrays.copyOfRange(parts, 2, 
parts.length));
         }
         Identifier identifier = new Identifier(databaseName, tableName);
-        // Get table from REST server
-        GetTableResponse table;
         try {
-            table = loadTableMetadata(identifier);
+            VFSTableInfo tableInfo = getTableInfo(identifier);
+            return relativePath == null
+                    ? new VFSTableRootIdentifier(databaseName, tableName, 
tableInfo)
+                    : new VFSTableObjectIdentifier(
+                            databaseName, tableName, relativePath, tableInfo);
         } catch (FileNotFoundException e) {
-            if (relativePath == null) {
-                return new VFSTableRootIdentifier(databaseName, tableName);
-            } else {
-                return new VFSTableObjectIdentifier(databaseName, tableName, 
relativePath);
-            }
-        }
-        if (table.isExternal()) {
-            throw new IOException("Do not support visiting external table " + 
identifier);
-        }
-        Path tablePath = new Path(table.getPath());
-        FileIO fileIO = new RESTTokenFileIO(context, api, identifier, 
tablePath);
-        VFSTableInfo tableInfo = new VFSTableInfo(table.getId(), tablePath, 
fileIO);
-        if (relativePath == null) {
-            return new VFSTableRootIdentifier(databaseName, tableName, 
tableInfo);
-        } else {
-            return new VFSTableObjectIdentifier(databaseName, tableName, 
relativePath, tableInfo);
+            return relativePath == null
+                    ? new VFSTableRootIdentifier(databaseName, tableName)
+                    : new VFSTableObjectIdentifier(databaseName, tableName, 
relativePath);
         }
     }
 
@@ -138,6 +169,16 @@ public class VFSOperations {
                                 + " is not empty, set recursive to true to 
drop it");
             }
             api.dropDatabase(databaseName);
+            // Remove table cache
+            if (isCacheEnabled()) {
+                List<Identifier> tables = new ArrayList<>();
+                for (Identifier identifier : tableCache.asMap().keySet()) {
+                    if (identifier.getDatabaseName().equals(databaseName)) {
+                        tables.add(identifier);
+                    }
+                }
+                tables.forEach(tableCache::invalidate);
+            }
         } catch (NoSuchResourceException e) {
             throw new FileNotFoundException("Database " + databaseName + " not 
found");
         } catch (ForbiddenException e) {
@@ -171,6 +212,10 @@ public class VFSOperations {
         Identifier identifier = Identifier.create(databaseName, tableName);
         try {
             api.dropTable(identifier);
+            // Remove table cache
+            if (isCacheEnabled()) {
+                tableCache.invalidate(identifier);
+            }
         } catch (NoSuchResourceException e) {
             throw new FileNotFoundException("Table " + identifier + " not 
found");
         } catch (ForbiddenException e) {
@@ -184,6 +229,11 @@ public class VFSOperations {
         Identifier dstIdentifier = Identifier.create(databaseName, 
dstTableName);
         try {
             api.renameTable(srcIdentifier, dstIdentifier);
+            // Remove table cache
+            if (isCacheEnabled()) {
+                tableCache.invalidate(srcIdentifier);
+                tableCache.invalidate(dstIdentifier);
+            }
         } catch (NoSuchResourceException e) {
             throw new FileNotFoundException("Source table " + srcIdentifier + 
" not found");
         } catch (ForbiddenException e) {
@@ -238,4 +288,35 @@ public class VFSOperations {
 
         return response;
     }
+
+    private VFSTableInfo loadTableInfo(Identifier identifier) throws 
IOException {
+        // Get table from REST server
+        GetTableResponse table;
+        table = loadTableMetadata(identifier);
+
+        if (table.isExternal()) {
+            throw new IOException("Do not support visiting external table " + 
identifier);
+        }
+        Path tablePath = new Path(table.getPath());
+        FileIO fileIO = new RESTTokenFileIO(context, api, identifier, 
tablePath);
+        return new VFSTableInfo(table.getId(), tablePath, fileIO);
+    }
+
+    private VFSTableInfo getTableInfo(Identifier identifier) throws 
IOException {
+        if (!isCacheEnabled()) {
+            return loadTableInfo(identifier);
+        }
+        VFSTableInfo vfsTableInfo = tableCache.getIfPresent(identifier);
+        if (vfsTableInfo != null) {
+            return vfsTableInfo;
+        }
+        vfsTableInfo = loadTableInfo(identifier);
+        tableCache.put(identifier, vfsTableInfo);
+        return vfsTableInfo;
+    }
+
+    @VisibleForTesting
+    public boolean isCacheEnabled() {
+        return tableCache != null;
+    }
 }
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
index 3ad71d033b..e18d3f5f59 100644
--- 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.vfs.hadoop;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -178,12 +179,12 @@ public class PaimonVirtualFileSystem extends FileSystem {
                     (VFSTableRootIdentifier) srcVfsIdentifier,
                     (VFSTableRootIdentifier) dstVfsIdentifier);
         } else {
-            if (!(dstVfsIdentifier instanceof VFSTableObjectIdentifier)) {
+            if (!(dstVfsIdentifier instanceof VFSTableIdentifier)) {
                 throw new IOException(
                         "Cannot rename to virtual path " + dst + " which is 
not a table");
             }
             VFSTableObjectIdentifier srcIdentifier = 
(VFSTableObjectIdentifier) srcVfsIdentifier;
-            VFSTableObjectIdentifier dstIdentifier = 
(VFSTableObjectIdentifier) dstVfsIdentifier;
+            VFSTableIdentifier dstIdentifier = (VFSTableIdentifier) 
dstVfsIdentifier;
             VFSTableInfo srcTableInfo = srcIdentifier.tableInfo();
             VFSTableInfo dstTableInfo = dstIdentifier.tableInfo();
             if (srcTableInfo == null) {
@@ -438,4 +439,9 @@ public class PaimonVirtualFileSystem extends FileSystem {
     public Path getWorkingDirectory() {
         return workingDirectory;
     }
+
+    @VisibleForTesting
+    public boolean isCacheEnabled() {
+        return vfsOperations.isCacheEnabled();
+    }
 }
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
new file mode 100644
index 0000000000..28d0a02335
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+
+/** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server and disable 
table cache. */
+public class MockRestNoCacheVirtualFileSystemTest extends 
MockRestVirtualFileSystemTest {
+
+    @Override
+    protected void initFs() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
+        conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
+        conf.set("fs.pvfs.token", initToken);
+        conf.setBoolean("fs.pvfs.cache-enabled", false);
+        this.vfs = new PaimonVirtualFileSystem();
+        this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
+        this.vfs.initialize(vfsRoot.toUri(), conf);
+
+        Assert.assertFalse(((PaimonVirtualFileSystem) vfs).isCacheEnabled());
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
index 0b3307db96..46f8bd17fe 100644
--- 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
@@ -36,6 +36,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
@@ -45,14 +46,14 @@ import java.util.UUID;
 
 /** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server. */
 public class MockRestVirtualFileSystemTest extends VirtualFileSystemTest {
-    private RESTCatalogServer restCatalogServer;
+    protected RESTCatalogServer restCatalogServer;
     private final String serverDefineHeaderName = "test-header";
     private final String serverDefineHeaderValue = "test-value";
-    private String dataPath;
-    private AuthProvider authProvider;
-    private Map<String, String> authMap;
-    private String initToken = "init_token";
-    private String restWarehouse;
+    protected String dataPath;
+    protected AuthProvider authProvider;
+    protected Map<String, String> authMap;
+    protected String initToken = "init_token";
+    protected String restWarehouse;
 
     @BeforeEach
     @Override
@@ -109,7 +110,7 @@ public class MockRestVirtualFileSystemTest extends 
VirtualFileSystemTest {
         return new RESTCatalog(CatalogContext.create(options));
     }
 
-    private void initFs() throws Exception {
+    protected void initFs() throws Exception {
         Configuration conf = new Configuration();
         conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
         conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
@@ -117,5 +118,7 @@ public class MockRestVirtualFileSystemTest extends 
VirtualFileSystemTest {
         this.vfs = new PaimonVirtualFileSystem();
         this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
         this.vfs.initialize(vfsRoot.toUri(), conf);
+
+        Assert.assertTrue(((PaimonVirtualFileSystem) vfs).isCacheEnabled());
     }
 }
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
index e410104b3b..0154589c11 100644
--- 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
@@ -315,6 +315,16 @@ public abstract class VirtualFileSystemTest {
         Assert.assertTrue(fileStatus.isFile());
         Assert.assertEquals(5, fileStatus.getLen());
 
+        // Rename to table root: /database/table/test_dir/file2.txt -> 
/database/table/
+        // which actually means: /database/table/test_dir/file2.txt -> 
/database/table/file2.txt
+        Path vfsPath3 = new Path(vfsRoot, databaseName + "/" + tableName);
+        Assert.assertTrue(vfs.rename(vfsPath2, vfsPath3));
+        fileStatus = vfs.getFileStatus(new Path(vfsPath3, "file2.txt"));
+        Assert.assertEquals(
+                new Path(vfsPath3, "file2.txt").toString(), 
fileStatus.getPath().toString());
+        Assert.assertTrue(fileStatus.isFile());
+        Assert.assertEquals(5, fileStatus.getLen());
+
         // Rename in non-existing table
         String tableName2 = "object_table2";
         vfsPath = new Path(vfsRoot, databaseName + "/" + tableName2 + 
"/test_dir/file.txt");

Reply via email to