This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b868fe3a46 [VFS] Enable table cache in PVFS (#5958)
b868fe3a46 is described below
commit b868fe3a464dc049b77a7a5c3542e1873ced47f9
Author: timmyyao <[email protected]>
AuthorDate: Fri Jul 25 18:43:06 2025 +0800
[VFS] Enable table cache in PVFS (#5958)
---
.../org/apache/paimon/fs/local/LocalFileIO.java | 10 +-
.../java/org/apache/paimon/vfs/VFSOperations.java | 119 +++++++++++++++++----
.../paimon/vfs/hadoop/PaimonVirtualFileSystem.java | 10 +-
.../MockRestNoCacheVirtualFileSystemTest.java | 43 ++++++++
.../vfs/hadoop/MockRestVirtualFileSystemTest.java | 17 +--
.../paimon/vfs/hadoop/VirtualFileSystemTest.java | 10 ++
6 files changed, 180 insertions(+), 29 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 1e355df609..c6b32b1ae0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -216,7 +216,15 @@ public class LocalFileIO implements FileIO {
try {
RENAME_LOCK.lock();
if (dstFile.exists()) {
- return false;
+ if (!dstFile.isDirectory()) {
+ return false;
+ }
+ // Make it compatible with HadoopFileIO: if dst is an existing
directory,
+ // dst=dst/srcFileName
+ dstFile = new File(dstFile, srcFile.getName());
+ if (dstFile.exists()) {
+ return false;
+ }
}
Files.move(srcFile.toPath(), dstFile.toPath(),
StandardCopyOption.ATOMIC_MOVE);
return true;
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
index 1fec65cd07..48865aa05e 100644
---
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -18,6 +18,7 @@
package org.apache.paimon.vfs;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
@@ -34,18 +35,29 @@ import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.schema.Schema;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
/** Wrap over RESTCatalog to provide basic operations for virtual path. */
public class VFSOperations {
@@ -55,8 +67,38 @@ public class VFSOperations {
private final RESTApi api;
private final CatalogContext context;
+ @Nullable private Cache<Identifier, VFSTableInfo> tableCache;
+
public VFSOperations(Options options) {
this.api = new RESTApi(options);
+
+ if (options.get(CACHE_ENABLED)) {
+ Duration expireAfterAccess =
options.get(CACHE_EXPIRE_AFTER_ACCESS);
+ if (expireAfterAccess.isZero() || expireAfterAccess.isNegative()) {
+ throw new IllegalArgumentException(
+ "When 'cache.expire-after-access' is set to negative
or 0, the catalog cache should be disabled.");
+ }
+ Duration expireAfterWrite = options.get(CACHE_EXPIRE_AFTER_WRITE);
+ if (expireAfterWrite.isZero() || expireAfterWrite.isNegative()) {
+ throw new IllegalArgumentException(
+ "When 'cache.expire-after-write' is set to negative or
0, the catalog cache should be disabled.");
+ }
+ LOG.info(
+ "Initialize virtual file system with table cache enabled,
expireAfterAccess={}, expireAfterWrite={}",
+ expireAfterAccess,
+ expireAfterWrite);
+
+ tableCache =
+ Caffeine.newBuilder()
+ .softValues()
+ .executor(Runnable::run)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
+ .ticker(Ticker.systemTicker())
+ .build();
+ } else {
+ LOG.info("Initialize virtual file system with table cache
disabled");
+ }
// Get the configured options which has been merged from REST Server
this.context = CatalogContext.create(api.options());
}
@@ -79,27 +121,16 @@ public class VFSOperations {
relativePath = String.join("/", Arrays.copyOfRange(parts, 2,
parts.length));
}
Identifier identifier = new Identifier(databaseName, tableName);
- // Get table from REST server
- GetTableResponse table;
try {
- table = loadTableMetadata(identifier);
+ VFSTableInfo tableInfo = getTableInfo(identifier);
+ return relativePath == null
+ ? new VFSTableRootIdentifier(databaseName, tableName,
tableInfo)
+ : new VFSTableObjectIdentifier(
+ databaseName, tableName, relativePath, tableInfo);
} catch (FileNotFoundException e) {
- if (relativePath == null) {
- return new VFSTableRootIdentifier(databaseName, tableName);
- } else {
- return new VFSTableObjectIdentifier(databaseName, tableName,
relativePath);
- }
- }
- if (table.isExternal()) {
- throw new IOException("Do not support visiting external table " +
identifier);
- }
- Path tablePath = new Path(table.getPath());
- FileIO fileIO = new RESTTokenFileIO(context, api, identifier,
tablePath);
- VFSTableInfo tableInfo = new VFSTableInfo(table.getId(), tablePath,
fileIO);
- if (relativePath == null) {
- return new VFSTableRootIdentifier(databaseName, tableName,
tableInfo);
- } else {
- return new VFSTableObjectIdentifier(databaseName, tableName,
relativePath, tableInfo);
+ return relativePath == null
+ ? new VFSTableRootIdentifier(databaseName, tableName)
+ : new VFSTableObjectIdentifier(databaseName, tableName,
relativePath);
}
}
@@ -138,6 +169,16 @@ public class VFSOperations {
+ " is not empty, set recursive to true to
drop it");
}
api.dropDatabase(databaseName);
+ // Remove table cache
+ if (isCacheEnabled()) {
+ List<Identifier> tables = new ArrayList<>();
+ for (Identifier identifier : tableCache.asMap().keySet()) {
+ if (identifier.getDatabaseName().equals(databaseName)) {
+ tables.add(identifier);
+ }
+ }
+ tables.forEach(tableCache::invalidate);
+ }
} catch (NoSuchResourceException e) {
throw new FileNotFoundException("Database " + databaseName + " not
found");
} catch (ForbiddenException e) {
@@ -171,6 +212,10 @@ public class VFSOperations {
Identifier identifier = Identifier.create(databaseName, tableName);
try {
api.dropTable(identifier);
+ // Remove table cache
+ if (isCacheEnabled()) {
+ tableCache.invalidate(identifier);
+ }
} catch (NoSuchResourceException e) {
throw new FileNotFoundException("Table " + identifier + " not
found");
} catch (ForbiddenException e) {
@@ -184,6 +229,11 @@ public class VFSOperations {
Identifier dstIdentifier = Identifier.create(databaseName,
dstTableName);
try {
api.renameTable(srcIdentifier, dstIdentifier);
+ // Remove table cache
+ if (isCacheEnabled()) {
+ tableCache.invalidate(srcIdentifier);
+ tableCache.invalidate(dstIdentifier);
+ }
} catch (NoSuchResourceException e) {
throw new FileNotFoundException("Source table " + srcIdentifier +
" not found");
} catch (ForbiddenException e) {
@@ -238,4 +288,35 @@ public class VFSOperations {
return response;
}
+
+ private VFSTableInfo loadTableInfo(Identifier identifier) throws
IOException {
+ // Get table from REST server
+ GetTableResponse table;
+ table = loadTableMetadata(identifier);
+
+ if (table.isExternal()) {
+ throw new IOException("Do not support visiting external table " +
identifier);
+ }
+ Path tablePath = new Path(table.getPath());
+ FileIO fileIO = new RESTTokenFileIO(context, api, identifier,
tablePath);
+ return new VFSTableInfo(table.getId(), tablePath, fileIO);
+ }
+
+ private VFSTableInfo getTableInfo(Identifier identifier) throws
IOException {
+ if (!isCacheEnabled()) {
+ return loadTableInfo(identifier);
+ }
+ VFSTableInfo vfsTableInfo = tableCache.getIfPresent(identifier);
+ if (vfsTableInfo != null) {
+ return vfsTableInfo;
+ }
+ vfsTableInfo = loadTableInfo(identifier);
+ tableCache.put(identifier, vfsTableInfo);
+ return vfsTableInfo;
+ }
+
+ @VisibleForTesting
+ public boolean isCacheEnabled() {
+ return tableCache != null;
+ }
}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
index 3ad71d033b..e18d3f5f59 100644
---
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -18,6 +18,7 @@
package org.apache.paimon.vfs.hadoop;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -178,12 +179,12 @@ public class PaimonVirtualFileSystem extends FileSystem {
(VFSTableRootIdentifier) srcVfsIdentifier,
(VFSTableRootIdentifier) dstVfsIdentifier);
} else {
- if (!(dstVfsIdentifier instanceof VFSTableObjectIdentifier)) {
+ if (!(dstVfsIdentifier instanceof VFSTableIdentifier)) {
throw new IOException(
"Cannot rename to virtual path " + dst + " which is
not a table");
}
VFSTableObjectIdentifier srcIdentifier =
(VFSTableObjectIdentifier) srcVfsIdentifier;
- VFSTableObjectIdentifier dstIdentifier =
(VFSTableObjectIdentifier) dstVfsIdentifier;
+ VFSTableIdentifier dstIdentifier = (VFSTableIdentifier)
dstVfsIdentifier;
VFSTableInfo srcTableInfo = srcIdentifier.tableInfo();
VFSTableInfo dstTableInfo = dstIdentifier.tableInfo();
if (srcTableInfo == null) {
@@ -438,4 +439,9 @@ public class PaimonVirtualFileSystem extends FileSystem {
public Path getWorkingDirectory() {
return workingDirectory;
}
+
+ @VisibleForTesting
+ public boolean isCacheEnabled() {
+ return vfsOperations.isCacheEnabled();
+ }
}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
new file mode 100644
index 0000000000..28d0a02335
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestNoCacheVirtualFileSystemTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+
+/** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server and disable
table cache. */
+public class MockRestNoCacheVirtualFileSystemTest extends
MockRestVirtualFileSystemTest {
+
+ @Override
+ protected void initFs() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
+ conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
+ conf.set("fs.pvfs.token", initToken);
+ conf.setBoolean("fs.pvfs.cache-enabled", false);
+ this.vfs = new PaimonVirtualFileSystem();
+ this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
+ this.vfs.initialize(vfsRoot.toUri(), conf);
+
+ Assert.assertFalse(((PaimonVirtualFileSystem) vfs).isCacheEnabled());
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
index 0b3307db96..46f8bd17fe 100644
---
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
+++
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
@@ -36,6 +36,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -45,14 +46,14 @@ import java.util.UUID;
/** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server. */
public class MockRestVirtualFileSystemTest extends VirtualFileSystemTest {
- private RESTCatalogServer restCatalogServer;
+ protected RESTCatalogServer restCatalogServer;
private final String serverDefineHeaderName = "test-header";
private final String serverDefineHeaderValue = "test-value";
- private String dataPath;
- private AuthProvider authProvider;
- private Map<String, String> authMap;
- private String initToken = "init_token";
- private String restWarehouse;
+ protected String dataPath;
+ protected AuthProvider authProvider;
+ protected Map<String, String> authMap;
+ protected String initToken = "init_token";
+ protected String restWarehouse;
@BeforeEach
@Override
@@ -109,7 +110,7 @@ public class MockRestVirtualFileSystemTest extends
VirtualFileSystemTest {
return new RESTCatalog(CatalogContext.create(options));
}
- private void initFs() throws Exception {
+ protected void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
@@ -117,5 +118,7 @@ public class MockRestVirtualFileSystemTest extends
VirtualFileSystemTest {
this.vfs = new PaimonVirtualFileSystem();
this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
this.vfs.initialize(vfsRoot.toUri(), conf);
+
+ Assert.assertTrue(((PaimonVirtualFileSystem) vfs).isCacheEnabled());
}
}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
index e410104b3b..0154589c11 100644
---
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
+++
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
@@ -315,6 +315,16 @@ public abstract class VirtualFileSystemTest {
Assert.assertTrue(fileStatus.isFile());
Assert.assertEquals(5, fileStatus.getLen());
+ // Rename to table root: /database/table/test_dir/file2.txt ->
/database/table/
+ // which actually means: /database/table/test_dir/file2.txt ->
/database/table/file2.txt
+ Path vfsPath3 = new Path(vfsRoot, databaseName + "/" + tableName);
+ Assert.assertTrue(vfs.rename(vfsPath2, vfsPath3));
+ fileStatus = vfs.getFileStatus(new Path(vfsPath3, "file2.txt"));
+ Assert.assertEquals(
+ new Path(vfsPath3, "file2.txt").toString(),
fileStatus.getPath().toString());
+ Assert.assertTrue(fileStatus.isFile());
+ Assert.assertEquals(5, fileStatus.getLen());
+
// Rename in non-existing table
String tableName2 = "object_table2";
vfsPath = new Path(vfsRoot, databaseName + "/" + tableName2 +
"/test_dir/file.txt");