This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 969c53e2ad [vfs] Implement Paimon Virtual Filesystem (#5916)
969c53e2ad is described below

commit 969c53e2adc904f691b20f373726abe4096442b1
Author: timmyyao <[email protected]>
AuthorDate: Wed Jul 23 10:28:17 2025 +0800

    [vfs] Implement Paimon Virtual Filesystem (#5916)
---
 paimon-vfs/paimon-vfs-common/pom.xml               |  35 ++
 .../apache/paimon/vfs/VFSCatalogIdentifier.java    |  27 ++
 .../java/org/apache/paimon/vfs/VFSDataToken.java   |  64 +++
 .../apache/paimon/vfs/VFSDatabaseIdentifier.java   |  27 ++
 .../java/org/apache/paimon/vfs/VFSIdentifier.java  |  45 ++
 .../java/org/apache/paimon/vfs/VFSOperations.java  | 339 +++++++++++++++
 .../org/apache/paimon/vfs/VFSTableIdentifier.java  |  99 +++++
 .../paimon/vfs/VFSTableObjectIdentifier.java       |  38 ++
 .../apache/paimon/vfs/VFSTableRootIdentifier.java  |  38 ++
 paimon-vfs/paimon-vfs-hadoop/pom.xml               | 114 +++++
 .../paimon/vfs/hadoop/PaimonVirtualFileSystem.java | 443 ++++++++++++++++++++
 .../PaimonVirtualFileSystemConfiguration.java      |  41 ++
 .../java/org/apache/paimon/vfs/hadoop/Pvfs.java    |  36 ++
 .../apache/paimon/vfs/hadoop/VFSInputStream.java   |  80 ++++
 .../vfs/hadoop/MockRestVirtualFileSystemTest.java  | 121 ++++++
 .../paimon/vfs/hadoop/VirtualFileSystemTest.java   | 457 +++++++++++++++++++++
 paimon-vfs/pom.xml                                 | 105 +++++
 pom.xml                                            |   1 +
 18 files changed, 2110 insertions(+)

diff --git a/paimon-vfs/paimon-vfs-common/pom.xml 
b/paimon-vfs/paimon-vfs-common/pom.xml
new file mode 100644
index 0000000000..5452384e7a
--- /dev/null
+++ b/paimon-vfs/paimon-vfs-common/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-vfs</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-vfs-common</artifactId>
+    <name>Paimon : VirtualFileSystem Common</name>
+
+    <packaging>jar</packaging>
+</project>
\ No newline at end of file
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
new file mode 100644
index 0000000000..3118121b93
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for catalog. */
+public class VFSCatalogIdentifier extends VFSIdentifier {
+
+    public VFSCatalogIdentifier() {
+        super(VFSFileType.CATALOG, null);
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
new file mode 100644
index 0000000000..730ee591af
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** Data token. */
+public class VFSDataToken implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final Map<String, String> token;
+    private final long expireAtMillis;
+    @Nullable private Integer hash;
+
+    public VFSDataToken(Map<String, String> token, long expireAtMillis) {
+        this.token = token;
+        this.expireAtMillis = expireAtMillis;
+    }
+
+    public Map<String, String> token() {
+        return this.token;
+    }
+
+    public long expireAtMillis() {
+        return this.expireAtMillis;
+    }
+
+    public boolean equals(Object o) {
+        if (o != null && this.getClass() == o.getClass()) {
+            VFSDataToken token1 = (VFSDataToken) o;
+            return this.expireAtMillis == token1.expireAtMillis
+                    && Objects.equals(this.token, token1.token);
+        } else {
+            return false;
+        }
+    }
+
+    public int hashCode() {
+        if (this.hash == null) {
+            this.hash = Objects.hash(new Object[] {this.token, 
this.expireAtMillis});
+        }
+
+        return this.hash;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
new file mode 100644
index 0000000000..02a7390788
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for database. */
+public class VFSDatabaseIdentifier extends VFSIdentifier {
+
+    public VFSDatabaseIdentifier(String databaseName) {
+        super(VFSFileType.DATABASE, databaseName);
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
new file mode 100644
index 0000000000..d454cec040
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for virtual path. */
+public abstract class VFSIdentifier {
+    enum VFSFileType {
+        CATALOG, // pvfs://catalog/
+        DATABASE, // pvfs://catalog/database/
+        TABLE, // pvfs://catalog/database/table/
+        TABLE_OBJECT // pvfs://catalog/database/table/file.txt
+    }
+
+    protected VFSFileType vfsFileType;
+    protected String databaseName;
+
+    public VFSIdentifier(VFSFileType vfsFileType, String databaseName) {
+        this.vfsFileType = vfsFileType;
+        this.databaseName = databaseName;
+    }
+
+    public VFSFileType getVfsFileType() {
+        return vfsFileType;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
new file mode 100644
index 0000000000..ad9cd09122
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTApi;
+import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.exceptions.AlreadyExistsException;
+import org.apache.paimon.rest.exceptions.BadRequestException;
+import org.apache.paimon.rest.exceptions.ForbiddenException;
+import org.apache.paimon.rest.exceptions.NoSuchResourceException;
+import org.apache.paimon.rest.exceptions.NotImplementedException;
+import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.ThreadUtils;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+
+/** Wrap over RESTCatalog to provide basic operations for virtual path. */
+public class VFSOperations {
+    private static final Logger LOG = 
LoggerFactory.getLogger(VFSOperations.class);
+
+    private final RESTApi api;
+    private final CatalogContext context;
+
+    // table id -> fileIO
+    private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE =
+            Caffeine.newBuilder()
+                    .expireAfterAccess(30, TimeUnit.MINUTES)
+                    .maximumSize(1000)
+                    .removalListener(
+                            (ignored, value, cause) -> 
IOUtils.closeQuietly((FileIO) value))
+                    .scheduler(
+                            Scheduler.forScheduledExecutorService(
+                                    Executors.newSingleThreadScheduledExecutor(
+                                            ThreadUtils.newDaemonThreadFactory(
+                                                    
"rest-token-file-io-scheduler"))))
+                    .build();
+
+    private static final Cache<String, VFSDataToken> TOKEN_CACHE =
+            Caffeine.newBuilder().expireAfterAccess(30, 
TimeUnit.MINUTES).maximumSize(1000).build();
+
+    public VFSOperations(CatalogContext context) {
+        this.context = context;
+        this.api = new RESTApi(context.options());
+    }
+
+    public VFSIdentifier getVFSIdentifier(String virtualPath) throws 
IOException {
+        if (virtualPath.startsWith("/")) {
+            virtualPath = virtualPath.substring(1);
+        }
+        String[] parts = virtualPath.split("/");
+        if (virtualPath.isEmpty() || parts.length == 0) {
+            return new VFSCatalogIdentifier();
+        } else if (parts.length == 1) {
+            return new VFSDatabaseIdentifier(parts[0]);
+        }
+        // parts.length >= 2: table or table object
+        String databaseName = parts[0];
+        String tableName = parts[1];
+        Identifier identifier = new Identifier(databaseName, tableName);
+        // Get table from REST server
+        GetTableResponse table;
+        try {
+            table = loadTableMetadata(identifier);
+        } catch (FileNotFoundException e) {
+            if (parts.length == 2) {
+                return new VFSTableRootIdentifier(databaseName, tableName);
+            } else {
+                return new VFSTableObjectIdentifier(databaseName, tableName);
+            }
+        }
+        if (table.isExternal()) {
+            throw new IOException("Do not support visiting external table " + 
identifier);
+        }
+        // Get real path
+        StringBuilder realPath = new StringBuilder(table.getPath());
+        boolean isTableRoot = true;
+        if (parts.length > 2) {
+            isTableRoot = false;
+            if (!table.getPath().endsWith("/")) {
+                realPath.append("/");
+            }
+            for (int i = 2; i < parts.length; i++) {
+                realPath.append(parts[i]);
+                if (i < parts.length - 1) {
+                    realPath.append("/");
+                }
+            }
+        }
+        // Get REST token
+        FileIO fileIO = getFileIO(new Identifier(databaseName, tableName), 
table);
+
+        if (parts.length == 2) {
+            return new VFSTableRootIdentifier(
+                    table, realPath.toString(), fileIO, databaseName, 
tableName);
+        } else {
+            return new VFSTableObjectIdentifier(
+                    table, realPath.toString(), fileIO, databaseName, 
tableName);
+        }
+    }
+
+    public GetDatabaseResponse getDatabase(String databaseName) throws 
IOException {
+        try {
+            return api.getDatabase(databaseName);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Database " + databaseName + " not 
found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to access database " + 
databaseName);
+        }
+    }
+
+    public List<String> listDatabases() {
+        return api.listDatabases();
+    }
+
+    public void createDatabase(String databaseName) throws IOException {
+        try {
+            api.createDatabase(databaseName, Collections.emptyMap());
+        } catch (AlreadyExistsException e) {
+            LOG.info("Database {} already exist, no need to create", 
databaseName);
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to create database " + 
databaseName);
+        } catch (BadRequestException e) {
+            throw new IOException("Bad request when creating database " + 
databaseName, e);
+        }
+    }
+
+    public void dropDatabase(String databaseName, boolean recursive) throws 
IOException {
+        try {
+            if (!recursive && !api.listTables(databaseName).isEmpty()) {
+                throw new IOException(
+                        "Database "
+                                + databaseName
+                                + " is not empty, set recursive to true to 
drop it");
+            }
+            api.dropDatabase(databaseName);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Database " + databaseName + " not 
found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to drop database " + 
databaseName);
+        }
+    }
+
+    public List<String> listTables(String databaseName) throws IOException {
+        try {
+            return api.listTables(databaseName);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Database " + databaseName + " not 
found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to access database " + 
databaseName);
+        }
+    }
+
+    public void createObjectTable(String databaseName, String tableName) 
throws IOException {
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        Schema schema = Schema.newBuilder().option(TYPE.key(), 
OBJECT_TABLE.toString()).build();
+        try {
+            tryCreateObjectTable(identifier, schema);
+        } catch (FileNotFoundException e) {
+            // Database not exist, try to create database and then create 
table again
+            createDatabase(databaseName);
+            tryCreateObjectTable(identifier, schema);
+        }
+    }
+
+    public void dropTable(String databaseName, String tableName) throws 
IOException {
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        try {
+            api.dropTable(identifier);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Table " + identifier + " not 
found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to drop table " + identifier);
+        }
+    }
+
+    public void renameTable(String databaseName, String srcTableName, String 
dstTableName)
+            throws IOException {
+        Identifier srcIdentifier = Identifier.create(databaseName, 
srcTableName);
+        Identifier dstIdentifier = Identifier.create(databaseName, 
dstTableName);
+        try {
+            api.renameTable(srcIdentifier, dstIdentifier);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Source table " + srcIdentifier + 
" not found");
+        } catch (ForbiddenException e) {
+            throw new IOException(
+                    "No permission to rename table " + srcIdentifier + " to " 
+ dstIdentifier);
+        } catch (AlreadyExistsException e) {
+            throw new FileAlreadyExistsException(
+                    "Target table " + dstIdentifier + " already exist");
+        } catch (BadRequestException e) {
+            throw new IOException(
+                    "Bad request when renaming table " + srcIdentifier + " to 
" + dstIdentifier, e);
+        }
+    }
+
+    private void tryCreateObjectTable(Identifier identifier, Schema schema) 
throws IOException {
+        try {
+            api.createTable(identifier, schema);
+        } catch (AlreadyExistsException e) {
+            LOG.info("Table {} already exist, no need to create", identifier);
+        } catch (NotImplementedException e) {
+            throw new IOException("Create object table not implemented");
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Database not found");
+        } catch (BadRequestException e) {
+            throw new IOException("Bad request when creating table " + 
identifier, e);
+        } catch (IllegalArgumentException e) {
+            throw new IOException("Illegal argument when creating table " + 
identifier, e);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private FileIO getFileIO(Identifier identifier, GetTableResponse table) 
throws IOException {
+        VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId());
+        if (shouldRefresh(token)) {
+            synchronized (TOKEN_CACHE) {
+                token = TOKEN_CACHE.getIfPresent(table.getId());
+                if (shouldRefresh(token)) {
+                    token = refreshToken(identifier);
+                    TOKEN_CACHE.put(table.getId(), token);
+                }
+            }
+        }
+
+        FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
+        if (fileIO != null) {
+            return fileIO;
+        }
+
+        synchronized (FILE_IO_CACHE) {
+            fileIO = FILE_IO_CACHE.getIfPresent(token);
+            if (fileIO != null) {
+                return fileIO;
+            }
+
+            Options options = context.options();
+            // the original options are not overwritten
+            options = new Options(RESTUtil.merge(token.token(), 
options.toMap()));
+            options.set(FILE_IO_ALLOW_CACHE, false);
+            CatalogContext fileIOContext = CatalogContext.create(options);
+            fileIO = FileIO.get(new Path(table.getPath()), fileIOContext);
+            FILE_IO_CACHE.put(token, fileIO);
+            return fileIO;
+        }
+    }
+
+    private boolean shouldRefresh(VFSDataToken token) {
+        return token == null
+                || token.expireAtMillis() - System.currentTimeMillis()
+                        < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+    }
+
+    private VFSDataToken refreshToken(Identifier identifier) throws 
IOException {
+        LOG.info("begin refresh data token for identifier [{}]", identifier);
+        GetTableTokenResponse response;
+        try {
+            response = api.loadTableToken(identifier);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Table " + identifier + " not 
found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to access table " + 
identifier);
+        }
+
+        LOG.info(
+                "end refresh data token for identifier [{}] expiresAtMillis 
[{}]",
+                identifier,
+                response.getExpiresAtMillis());
+
+        VFSDataToken token = new VFSDataToken(response.getToken(), 
response.getExpiresAtMillis());
+        return token;
+    }
+
+    private GetTableResponse loadTableMetadata(Identifier identifier) throws 
IOException {
+        // if the table is system table, we need to load table metadata from 
the system table's data
+        // table
+        Identifier loadTableIdentifier =
+                identifier.isSystemTable()
+                        ? new Identifier(
+                                identifier.getDatabaseName(),
+                                identifier.getTableName(),
+                                identifier.getBranchName())
+                        : identifier;
+
+        GetTableResponse response;
+        try {
+            response = api.getTable(loadTableIdentifier);
+        } catch (NoSuchResourceException e) {
+            throw new FileNotFoundException("Table not found");
+        } catch (ForbiddenException e) {
+            throw new IOException("No permission to access table " + 
identifier);
+        }
+
+        return response;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
new file mode 100644
index 0000000000..86f0605322
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/** Identifier for table. */
+public abstract class VFSTableIdentifier extends VFSIdentifier {
+    protected Path realPath;
+    protected String scheme;
+    protected URI realUri;
+    protected String tableName;
+    protected GetTableResponse table;
+    protected FileIO fileIO;
+    protected String tableLocation;
+
+    // Constructor for non-exist table
+    public VFSTableIdentifier(VFSFileType vfsFileType, String databaseName, 
String tableName) {
+        super(vfsFileType, databaseName);
+        this.tableName = tableName;
+    }
+
+    // Constructor for existing table
+    public VFSTableIdentifier(
+            VFSFileType vfsFileType,
+            GetTableResponse table,
+            String realPath,
+            FileIO fileIO,
+            String databaseName,
+            String tableName) {
+        super(vfsFileType, databaseName);
+        this.realPath = new Path(realPath);
+        try {
+            this.realUri = new URI(realPath);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+        this.scheme = realUri.getScheme();
+        this.tableName = tableName;
+        this.table = table;
+        this.fileIO = fileIO;
+        if (table != null) {
+            this.tableLocation = table.getPath();
+        }
+    }
+
+    public Path getRealPath() {
+        return realPath;
+    }
+
+    public URI getRealUri() {
+        return realUri;
+    }
+
+    public String getScheme() {
+        return scheme;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public GetTableResponse getTable() {
+        return table;
+    }
+
+    public FileIO fileIO() {
+        return fileIO;
+    }
+
+    public String getTableLocation() {
+        return tableLocation;
+    }
+
+    public boolean isTableExist() {
+        return table != null;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
new file mode 100644
index 0000000000..b0cdfbb1bb
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+/** Identifier for objects under a table. */
+public class VFSTableObjectIdentifier extends VFSTableIdentifier {
+    public VFSTableObjectIdentifier(String databaseName, String tableName) {
+        super(VFSFileType.TABLE_OBJECT, databaseName, tableName);
+    }
+
+    public VFSTableObjectIdentifier(
+            GetTableResponse table,
+            String realPath,
+            FileIO fileIO,
+            String databaseName,
+            String tableName) {
+        super(VFSFileType.TABLE_OBJECT, table, realPath, fileIO, databaseName, 
tableName);
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
new file mode 100644
index 0000000000..ee398b06f6
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+/** Identifier for objects under a table. */
+public class VFSTableRootIdentifier extends VFSTableIdentifier {
+    public VFSTableRootIdentifier(String databaseName, String tableName) {
+        super(VFSFileType.TABLE, databaseName, tableName);
+    }
+
+    public VFSTableRootIdentifier(
+            GetTableResponse table,
+            String realPath,
+            FileIO fileIO,
+            String databaseName,
+            String tableName) {
+        super(VFSFileType.TABLE, table, realPath, fileIO, databaseName, 
tableName);
+    }
+}
diff --git a/paimon-vfs/paimon-vfs-hadoop/pom.xml 
b/paimon-vfs/paimon-vfs-hadoop/pom.xml
new file mode 100644
index 0000000000..3c53ded410
--- /dev/null
+++ b/paimon-vfs/paimon-vfs-hadoop/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-vfs</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-vfs-hadoop</artifactId>
+    <name>Paimon : VirtualFileSystem Hadoop</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <fs.hadoopshaded.version>3.3.4</fs.hadoopshaded.version>
+        <fs.s3.aws.version>1.12.319</fs.s3.aws.version>
+        <commons.beanutils.version>1.9.4</commons.beanutils.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-vfs-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${fs.hadoopshaded.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aliyun</artifactId>
+            <version>${fs.hadoopshaded.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.aliyun.oss</groupId>
+                    <artifactId>aliyun-oss-sdk</artifactId>
+                </exclusion>
+                <exclusion>
+                    <!-- provided by paimon-hadoop-shaded -->
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    
<include>org.apache.paimon:paimon-vfs-common</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
new file mode 100644
index 0000000000..3eb1849b23
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.vfs.VFSCatalogIdentifier;
+import org.apache.paimon.vfs.VFSDatabaseIdentifier;
+import org.apache.paimon.vfs.VFSIdentifier;
+import org.apache.paimon.vfs.VFSOperations;
+import org.apache.paimon.vfs.VFSTableIdentifier;
+import org.apache.paimon.vfs.VFSTableObjectIdentifier;
+import org.apache.paimon.vfs.VFSTableRootIdentifier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+/** Paimon virtual file system. */
+public class PaimonVirtualFileSystem extends FileSystem {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonVirtualFileSystem.class);
+
+    private Path workingDirectory;
+    private URI uri;
+    private VFSOperations vfsOperations;
+    private Configuration conf;
+
+    @Override
+    public void initialize(URI uri, Configuration conf) throws IOException {
+        setConf(conf);
+        this.conf = conf;
+        super.initialize(uri, conf);
+
+        this.workingDirectory = new Path(uri);
+        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority() + 
"/");
+
+        initVFSOperations();
+    }
+
+    private void initVFSOperations() {
+        Options options = 
PaimonVirtualFileSystemConfiguration.convertToCatalogOptions(conf);
+        // pvfs://catalog_name/database_name/table_name/file, so uri authority 
is catalog name
+        options.set(CatalogOptions.WAREHOUSE, uri.getAuthority());
+
+        CatalogContext catalogContext = CatalogContext.create(options);
+        vfsOperations = new VFSOperations(catalogContext);
+    }
+
+    private String getVirtualPath(Path path) {
+        Path qualifiedPath = path.makeQualified(uri, workingDirectory);
+        // convert to absolute path, like /database_name/table_name/file
+        return qualifiedPath.toUri().getPath();
+    }
+
+    @Override
+    public FSDataOutputStream create(
+            Path f,
+            FsPermission permission,
+            boolean overwrite,
+            int bufferSize,
+            short replication,
+            long blockSize,
+            Progressable progress)
+            throws IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            throw new IOException(
+                    "Cannot create file for virtual path " + f + " which is a 
catalog");
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            throw new IOException(
+                    "Cannot create file for virtual path " + f + " which is a 
database");
+        } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+            throw new IOException(
+                    "Cannot create file for table level virtual path " + f + " 
which is a table");
+        } else {
+            VFSTableObjectIdentifier vfsTableObjectIdentifier =
+                    (VFSTableObjectIdentifier) vfsIdentifier;
+            if (!vfsTableObjectIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot create a file for virtual path "
+                                + f
+                                + " which is not in an existing table");
+            }
+            PositionOutputStream out =
+                    vfsTableObjectIdentifier
+                            .fileIO()
+                            
.newOutputStream(vfsTableObjectIdentifier.getRealPath(), overwrite);
+            return new FSDataOutputStream(out, statistics);
+        }
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int bufferSize) throws 
IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(path));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            throw new FileNotFoundException(
+                    "Cannot open file for virtual path " + path + " which is a 
catalog");
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            throw new FileNotFoundException(
+                    "Cannot open file for virtual path " + path + " which is a 
database");
+        } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+            throw new FileNotFoundException(
+                    "Cannot open file for virtual path " + path + " which is a 
table");
+        } else {
+            VFSTableObjectIdentifier vfsTableObjectIdentifier =
+                    (VFSTableObjectIdentifier) vfsIdentifier;
+            if (!vfsTableObjectIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot open file for virtual path "
+                                + path
+                                + " which is not in an existing table");
+            }
+            VFSInputStream in =
+                    new VFSInputStream(
+                            vfsTableObjectIdentifier
+                                    .fileIO()
+                                    
.newInputStream(vfsTableObjectIdentifier.getRealPath()),
+                            statistics);
+            return new FSDataInputStream(in);
+        }
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable 
progress)
+            throws IOException {
+        throw new IOException("Append is not supported");
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        VFSIdentifier srcVfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(src));
+        VFSIdentifier dstVfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(dst));
+        if (srcVfsIdentifier instanceof VFSCatalogIdentifier) {
+            LOG.debug("Rename root path is ignored");
+            return false;
+        } else if (srcVfsIdentifier instanceof VFSDatabaseIdentifier) {
+            throw new IOException(
+                    "Cannot rename from virtual path " + src + " which is a 
database");
+        } else if (srcVfsIdentifier instanceof VFSTableRootIdentifier) {
+            if (!(dstVfsIdentifier instanceof VFSTableRootIdentifier)) {
+                throw new IOException(
+                        "Cannot rename from table path " + src + " to 
non-table path " + dst);
+            }
+            return renameTable(
+                    (VFSTableRootIdentifier) srcVfsIdentifier,
+                    (VFSTableRootIdentifier) dstVfsIdentifier);
+        } else {
+            if (!(dstVfsIdentifier instanceof VFSTableIdentifier)) {
+                throw new IOException(
+                        "Cannot rename to virtual path " + dst + " which is 
not a table");
+            }
+            VFSTableIdentifier srcTableIdentifier = (VFSTableIdentifier) 
srcVfsIdentifier;
+            VFSTableIdentifier dstTableIdentifier = (VFSTableIdentifier) 
dstVfsIdentifier;
+            if (!srcTableIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot rename from virtual path "
+                                + src
+                                + " which is not in an existing table");
+            }
+            if (!dstTableIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot rename to virtual path "
+                                + dst
+                                + " which is not in an existing table");
+            }
+            GetTableResponse srcTable = srcTableIdentifier.getTable();
+            GetTableResponse dstTable = dstTableIdentifier.getTable();
+            if (!srcTable.getId().equals(dstTable.getId())) {
+                throw new IOException(
+                        "Cannot rename from virtual path "
+                                + src
+                                + " to virtual path "
+                                + dst
+                                + " which is not in the same table");
+            }
+            return srcTableIdentifier
+                    .fileIO()
+                    .rename(srcTableIdentifier.getRealPath(), 
dstTableIdentifier.getRealPath());
+        }
+    }
+
+    private boolean renameTable(
+            VFSTableRootIdentifier srcIdentifier, VFSTableRootIdentifier 
dstIdentifier)
+            throws IOException {
+        if 
(!srcIdentifier.getDatabaseName().equals(dstIdentifier.getDatabaseName())) {
+            throw new IOException("Do not support rename table with different 
database");
+        }
+        if (!srcIdentifier.isTableExist()) {
+            // return false if src does not exist
+            LOG.debug(
+                    "Source table not found "
+                            + srcIdentifier.getDatabaseName()
+                            + "."
+                            + srcIdentifier.getTableName());
+            return false;
+        }
+        if (srcIdentifier.getTableName().equals(dstIdentifier.getTableName())) 
{
+            // src equals to dst, return true
+            return true;
+        }
+        try {
+            vfsOperations.renameTable(
+                    srcIdentifier.getDatabaseName(),
+                    srcIdentifier.getTableName(),
+                    dstIdentifier.getTableName());
+            return true;
+        } catch (FileNotFoundException e) {
+            LOG.debug(
+                    "Source table not found "
+                            + srcIdentifier.getDatabaseName()
+                            + "."
+                            + srcIdentifier.getTableName());
+            return false;
+        }
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            throw new IOException("Cannot delete virtual path " + f + " which 
is a catalog");
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            try {
+                vfsOperations.dropDatabase(vfsIdentifier.getDatabaseName(), 
recursive);
+            } catch (FileNotFoundException e) {
+                LOG.debug("Database not found for deleting path " + f);
+                return false;
+            }
+            return true;
+        } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+            VFSTableRootIdentifier vfsTableRootIdentifier = 
(VFSTableRootIdentifier) vfsIdentifier;
+            try {
+                vfsOperations.dropTable(
+                        vfsTableRootIdentifier.getDatabaseName(),
+                        vfsTableRootIdentifier.getTableName());
+            } catch (FileNotFoundException e) {
+                LOG.debug("Table not found for deleting path " + f);
+                return false;
+            }
+            return true;
+        } else {
+            VFSTableObjectIdentifier vfsTableObjectIdentifier =
+                    (VFSTableObjectIdentifier) vfsIdentifier;
+            if (!vfsTableObjectIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot delete virtual path " + f + " which is not in 
an existing table");
+            }
+            return vfsTableObjectIdentifier
+                    .fileIO()
+                    .delete(vfsTableObjectIdentifier.getRealPath(), recursive);
+        }
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            return new FileStatus(0, true, 1, 1, 0, new Path(this.uri));
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            GetDatabaseResponse database =
+                    vfsOperations.getDatabase(vfsIdentifier.getDatabaseName());
+            return convertDatabase(database);
+        } else {
+            VFSTableIdentifier vfsTableIdentifier = (VFSTableIdentifier) 
vfsIdentifier;
+            if (!vfsTableIdentifier.isTableExist()) {
+                throw new FileNotFoundException("Table not found for path " + 
f);
+            }
+            org.apache.paimon.fs.FileStatus fileStatus =
+                    
vfsTableIdentifier.fileIO().getFileStatus(vfsTableIdentifier.getRealPath());
+            return convertFileStatus(vfsTableIdentifier, fileStatus);
+        }
+    }
+
+    private FileStatus convertDatabase(GetDatabaseResponse database) {
+        return new FileStatus(0, true, 1, 1, 0, new Path(new Path(this.uri), 
database.getName()));
+    }
+
+    private FileStatus convertFileStatus(
+            VFSTableIdentifier vfsIdentifier, org.apache.paimon.fs.FileStatus 
paimonFileStatus)
+            throws IOException {
+        String realPath = paimonFileStatus.getPath().toString();
+        if (!realPath.startsWith(vfsIdentifier.getTableLocation())) {
+            throw new IOException(
+                    "Result path "
+                            + realPath
+                            + " does not start with table location "
+                            + vfsIdentifier.getTableLocation());
+        }
+        String childPath = 
realPath.substring(vfsIdentifier.getTableLocation().length());
+        if (!childPath.startsWith("/")) {
+            childPath = "/" + childPath;
+        }
+        Path virtualPath =
+                new Path(
+                        new Path(this.uri),
+                        vfsIdentifier.getDatabaseName()
+                                + "/"
+                                + vfsIdentifier.getTableName()
+                                + childPath);
+        return new FileStatus(
+                paimonFileStatus.getLen(),
+                paimonFileStatus.isDir(),
+                1,
+                1,
+                paimonFileStatus.getModificationTime(),
+                virtualPath);
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, 
IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            List<String> databases = vfsOperations.listDatabases();
+            return convertDatabases(databases);
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            List<String> tables = 
vfsOperations.listTables(vfsIdentifier.getDatabaseName());
+            return convertTables(vfsIdentifier.getDatabaseName(), tables);
+        } else {
+            VFSTableIdentifier vfsTableIdentifier = (VFSTableIdentifier) 
vfsIdentifier;
+            if (!vfsTableIdentifier.isTableExist()) {
+                throw new FileNotFoundException("Table not found for path " + 
f);
+            }
+            org.apache.paimon.fs.FileStatus[] paimonFileStatuses =
+                    
vfsTableIdentifier.fileIO().listStatus(vfsTableIdentifier.getRealPath());
+            return convertFileStatuses(vfsTableIdentifier, paimonFileStatuses);
+        }
+    }
+
+    private FileStatus[] convertDatabases(List<String> databases) {
+        FileStatus[] fileStatuses = new FileStatus[databases.size()];
+        for (int i = 0; i < databases.size(); i++) {
+            String database = databases.get(i);
+            FileStatus fileStatus =
+                    new FileStatus(0, true, 1, 1, 0, new Path(new 
Path(this.uri), database));
+            fileStatuses[i] = fileStatus;
+        }
+        return fileStatuses;
+    }
+
+    private FileStatus[] convertTables(String database, List<String> tables) {
+        FileStatus[] fileStatuses = new FileStatus[tables.size()];
+        for (int i = 0; i < tables.size(); i++) {
+            String table = tables.get(i);
+            FileStatus fileStatus =
+                    new FileStatus(
+                            0, true, 1, 1, 0, new Path(new Path(this.uri), 
database + "/" + table));
+            fileStatuses[i] = fileStatus;
+        }
+        return fileStatuses;
+    }
+
+    private FileStatus[] convertFileStatuses(
+            VFSTableIdentifier vfsIdentifier, 
org.apache.paimon.fs.FileStatus[] paimonFileStatuses)
+            throws IOException {
+        FileStatus[] fileStatuses = new FileStatus[paimonFileStatuses.length];
+        for (int i = 0; i < paimonFileStatuses.length; i++) {
+            fileStatuses[i] = convertFileStatus(vfsIdentifier, 
paimonFileStatuses[i]);
+        }
+        return fileStatuses;
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+        VFSIdentifier vfsIdentifier = 
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+        if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+            throw new IOException("Cannot mkdirs for virtual path " + f + " 
which is a catalog");
+        } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+            vfsOperations.createDatabase(vfsIdentifier.getDatabaseName());
+            return true;
+        } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+            VFSTableRootIdentifier vfsTableRootIdentifier = 
(VFSTableRootIdentifier) vfsIdentifier;
+            if (vfsTableRootIdentifier.isTableExist()) {
+                // Table already exists, no need to execute
+                return true;
+            }
+            vfsOperations.createObjectTable(
+                    vfsIdentifier.getDatabaseName(), 
vfsTableRootIdentifier.getTableName());
+            return true;
+        } else {
+            VFSTableObjectIdentifier vfsTableObjectIdentifier =
+                    (VFSTableObjectIdentifier) vfsIdentifier;
+            if (!vfsTableObjectIdentifier.isTableExist()) {
+                throw new IOException(
+                        "Cannot mkdirs for virtual path "
+                                + f
+                                + " which is not in an existing table");
+            }
+            return 
vfsTableObjectIdentifier.fileIO().mkdirs(vfsTableObjectIdentifier.getRealPath());
+        }
+    }
+
+    @Override
+    public String getScheme() {
+        return "pvfs";
+    }
+
+    @Override
+    public URI getUri() {
+        return uri;
+    }
+
+    @Override
+    public void setWorkingDirectory(Path newDir) {
+        workingDirectory = newDir;
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        return workingDirectory;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
new file mode 100644
index 0000000000..9c68976161
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.options.Options;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/** Paimon virtual file system configuration. */
+public class PaimonVirtualFileSystemConfiguration {
+    public static final String PAIMON_VFS_PREFIX = "fs.pvfs.";
+
+    public static Options convertToCatalogOptions(Configuration conf) {
+        Options options = new Options();
+        for (Map.Entry<String, String> entry : conf) {
+            if (entry.getKey().startsWith(PAIMON_VFS_PREFIX)) {
+                String key = 
entry.getKey().substring(PAIMON_VFS_PREFIX.length());
+                options.set(key, entry.getValue());
+            }
+        }
+        return options;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
new file mode 100644
index 0000000000..e878153bed
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * A {@link DelegateToFileSystem} implementation that delegates all operations 
to a {@link
+ * PaimonVirtualFileSystem}.
+ */
+public class Pvfs extends DelegateToFileSystem {
+    public Pvfs(URI theUri, Configuration conf) throws IOException, 
URISyntaxException {
+        super(theUri, new PaimonVirtualFileSystem(), conf, theUri.getScheme(), 
false);
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
new file mode 100644
index 0000000000..020a58d318
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.fs.SeekableInputStream;
+
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/**
+ * VFSInputStream wrap over paimon SeekableInputStream to support hadoop 
FSDataInputStream. TODO:
+ * SeekableInputStream interface is too simple to fully support all 
FSDataInputStream operations: 1.
+ * ByteBufferReadable and ByteBufferPositionedReadable should be implemented 
for full support. 2.
+ * Positioned read is not supported in SeekableInputStream, so it is by 
default implemented by
+ * sequence read, which is not a good solution.
+ */
+public class VFSInputStream extends FSInputStream {
+    private SeekableInputStream in;
+    private byte[] oneByteBuf = new byte[1];
+    private FileSystem.Statistics statistics;
+
+    public VFSInputStream(SeekableInputStream in, FileSystem.Statistics 
statistics) {
+        this.in = in;
+        this.statistics = statistics;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        in.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return in.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long var1) throws IOException {
+        return false;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int byteRead = in.read(b, off, len);
+        if (statistics != null && byteRead >= 0) {
+            statistics.incrementBytesRead(byteRead);
+        }
+        return byteRead;
+    }
+
+    @Override
+    public int read() throws IOException {
+        int n;
+        while ((n = read(oneByteBuf, 0, 1)) == 0) {
+            /* no op */
+        }
+        if (statistics != null && n >= 0) {
+            statistics.incrementBytesRead(n);
+        }
+        return (n == -1) ? -1 : oneByteBuf[0] & 0xff;
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
new file mode 100644
index 0000000000..0b3307db96
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTTokenFileIO;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server. */
+public class MockRestVirtualFileSystemTest extends VirtualFileSystemTest {
+    private RESTCatalogServer restCatalogServer;
+    private final String serverDefineHeaderName = "test-header";
+    private final String serverDefineHeaderValue = "test-value";
+    private String dataPath;
+    private AuthProvider authProvider;
+    private Map<String, String> authMap;
+    private String initToken = "init_token";
+    private String restWarehouse;
+
+    @BeforeEach
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        dataPath = warehouse;
+        this.authProvider = new BearTokenAuthProvider(initToken);
+        this.authMap =
+                ImmutableMap.of(
+                        RESTCatalogOptions.TOKEN.key(),
+                        initToken,
+                        RESTCatalogOptions.TOKEN_PROVIDER.key(),
+                        AuthProviderEnum.BEAR.identifier());
+        this.restWarehouse = UUID.randomUUID().toString();
+        this.catalog = initCatalog(false);
+
+        // test retry commit
+        RESTCatalogServer.commitSuccessThrowException = true;
+
+        initFs();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        restCatalogServer.shutdown();
+    }
+
+    private RESTCatalog initCatalog(boolean enableDataToken) throws 
IOException {
+        this.config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                "header." + serverDefineHeaderName,
+                                serverDefineHeaderValue,
+                                RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+                                enableDataToken + "",
+                                CatalogOptions.WAREHOUSE.key(),
+                                restWarehouse),
+                        ImmutableMap.of());
+        restCatalogServer =
+                new RESTCatalogServer(dataPath, this.authProvider, 
this.config, restWarehouse);
+        restCatalogServer.start();
+        for (Map.Entry<String, String> entry : this.authMap.entrySet()) {
+            options.set(entry.getKey(), entry.getValue());
+        }
+        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        String path =
+                enableDataToken
+                        ? dataPath.replaceFirst("file", 
RESTFileIOTestLoader.SCHEME)
+                        : dataPath;
+        options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
+        return new RESTCatalog(CatalogContext.create(options));
+    }
+
+    private void initFs() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
+        conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
+        conf.set("fs.pvfs.token", initToken);
+        this.vfs = new PaimonVirtualFileSystem();
+        this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
+        this.vfs.initialize(vfsRoot.toUri(), conf);
+    }
+}
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
new file mode 100644
index 0000000000..8b63cc187d
--- /dev/null
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonVirtualFileSystem}. */
+public abstract class VirtualFileSystemTest {
+    @TempDir java.nio.file.Path tempFile;
+    protected String warehouse;
+    protected FileIO fileIO;
+    protected RESTCatalog catalog;
+    protected ConfigResponse config;
+    protected Options options = new Options();
+    protected FileSystem vfs;
+    protected Path vfsRoot;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        warehouse = tempFile.toUri().toString();
+        Options catalogOptions = new Options();
+        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions);
+        fileIO = new ResolvingFileIO();
+        fileIO.configure(catalogContext);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (catalog != null) {
+            List<String> dbs = catalog.listDatabases();
+            for (String db : dbs) {
+                try {
+                    catalog.dropDatabase(db, true, true);
+                } catch (Exception ignored) {
+                }
+            }
+            catalog.close();
+        }
+    }
+
+    protected void createObjectTable(String databaseName, String tableName) 
throws Exception {
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        Schema schema = Schema.newBuilder().option(TYPE.key(), 
OBJECT_TABLE.toString()).build();
+        catalog.createTable(identifier, schema, false);
+        Table table = catalog.getTable(identifier);
+        assertThat(table).isInstanceOf(ObjectTable.class);
+    }
+
+    protected void createNormalTable(String databaseName, String tableName) 
throws Exception {
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        Table table = catalog.getTable(identifier);
+        assertThat(table).isInstanceOf(FileStoreTable.class);
+    }
+
+    protected void createDatabase(String databaseName) throws Exception {
+        catalog.createDatabase(databaseName, true);
+    }
+
+    protected void checkTableExist(String databaseName, String tableName, 
boolean expect)
+            throws Exception {
+        try {
+            Table table = catalog.getTable(Identifier.create(databaseName, 
tableName));
+            Assert.assertEquals(expect, true);
+        } catch (Catalog.TableNotExistException e) {
+            Assert.assertEquals(expect, false);
+        }
+    }
+
+    @Test
+    public void testMkdir() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir");
+        vfs.mkdirs(vfsPath);
+        FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+        Assert.assertEquals(vfsPath.toString(), 
fileStatus.getPath().toString());
+        Assert.assertTrue(fileStatus.isDirectory());
+
+        // Mkdir in non-existing table
+        tableName = "object_table2";
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir");
+        try {
+            vfs.mkdirs(vfsPath);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testVirtualMkdir() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+
+        // Mkdir for root is not supported
+        try {
+            vfs.mkdirs(vfsRoot);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+
+        // Create database in virtual file system
+        Path vfsPath = new Path(vfsRoot, databaseName);
+        Assert.assertTrue(vfs.mkdirs(vfsPath));
+        Database database = catalog.getDatabase(databaseName);
+        Assert.assertEquals(databaseName, database.name());
+
+        // Create object table in virtual file system
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+        Assert.assertTrue(vfs.mkdirs(vfsPath));
+        Table table = catalog.getTable(new Identifier(databaseName, 
tableName));
+        assertThat(table).isInstanceOf(ObjectTable.class);
+
+        // Create object table with database recursively created
+        databaseName = "test_db2";
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+        Assert.assertTrue(vfs.mkdirs(vfsPath));
+        table = catalog.getTable(new Identifier(databaseName, tableName));
+        assertThat(table).isInstanceOf(ObjectTable.class);
+    }
+
+    @Test
+    public void testCreate() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file.txt");
+        FSDataOutputStream out = vfs.create(vfsPath);
+        out.write("hello".getBytes());
+        out.close();
+
+        FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+        Assert.assertEquals(vfsPath.toString(), 
fileStatus.getPath().toString());
+        Assert.assertTrue(fileStatus.isFile());
+        Assert.assertEquals(5, fileStatus.getLen());
+
+        FSDataInputStream in = vfs.open(vfsPath);
+        byte[] buffer = new byte[5];
+        in.read(buffer);
+        in.close();
+        Assert.assertArrayEquals("hello".getBytes(), buffer);
+
+        // Create file in non-existing table
+        tableName = "object_table2";
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file.txt");
+        try {
+            vfs.create(vfsPath);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testVirtualCreate() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        // Create root
+        try {
+            vfs.create(vfsRoot);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+
+        // Create file for database
+        try {
+            vfs.create(new Path(vfsRoot, databaseName));
+            Assert.fail();
+        } catch (IOException e) {
+        }
+
+        // Create file for table
+        try {
+            vfs.create(new Path(vfsRoot, databaseName + "/" + tableName));
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testListStatus() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        Path vfsPath1 = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir");
+        vfs.mkdirs(vfsPath1);
+        Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName + 
"/file.txt");
+        FSDataOutputStream out = vfs.create(vfsPath2);
+        out.write("hello".getBytes());
+        out.close();
+
+        FileStatus[] fileStatuses =
+                vfs.listStatus(new Path(vfsRoot, databaseName + "/" + 
tableName));
+        Assert.assertEquals(2, fileStatuses.length);
+        for (FileStatus fileStatus : fileStatuses) {
+            if (fileStatus.getPath().toString().equals(vfsPath1.toString())) {
+                Assert.assertTrue(fileStatus.isDirectory());
+            } else if 
(fileStatus.getPath().toString().equals(vfsPath2.toString())) {
+                Assert.assertTrue(fileStatus.isFile());
+                Assert.assertEquals(5, fileStatus.getLen());
+            }
+        }
+
+        // List in non-existing table
+        tableName = "object_table2";
+        try {
+            vfs.listStatus(new Path(vfsRoot, databaseName + "/" + tableName));
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testVirtualListStatus() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        // List root
+        FileStatus[] fileStatuses = vfs.listStatus(vfsRoot);
+        Assert.assertEquals(1, fileStatuses.length);
+        Assert.assertEquals(
+                new Path(vfsRoot, databaseName).toString(), 
fileStatuses[0].getPath().toString());
+        Assert.assertTrue(fileStatuses[0].isDirectory());
+
+        // List database
+        fileStatuses = vfs.listStatus(new Path(vfsRoot, databaseName));
+        Assert.assertEquals(1, fileStatuses.length);
+        Assert.assertEquals(
+                new Path(vfsRoot, databaseName + "/" + tableName).toString(),
+                fileStatuses[0].getPath().toString());
+        Assert.assertTrue(fileStatuses[0].isDirectory());
+    }
+
+    @Test
+    public void testRename() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file.txt");
+        FSDataOutputStream out = vfs.create(vfsPath);
+        out.write("hello".getBytes());
+        out.close();
+
+        Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file2.txt");
+        Assert.assertTrue(vfs.rename(vfsPath, vfsPath2));
+
+        Assert.assertFalse(vfs.exists(vfsPath));
+        FileStatus fileStatus = vfs.getFileStatus(vfsPath2);
+        Assert.assertEquals(vfsPath2.toString(), 
fileStatus.getPath().toString());
+        Assert.assertTrue(fileStatus.isFile());
+        Assert.assertEquals(5, fileStatus.getLen());
+
+        // Rename in non-existing table
+        String tableName2 = "object_table2";
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName2 + 
"/test_dir/file.txt");
+        vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName2 + 
"/test_dir/file2.txt");
+        try {
+            vfs.rename(vfsPath, vfsPath2);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+
+        // Rename cross table is not supported
+        createObjectTable(databaseName, tableName2);
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file2.txt");
+        vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName2 + 
"/test_dir/file2.txt");
+        try {
+            vfs.rename(vfsPath, vfsPath2);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testVirtualRename() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createNormalTable(databaseName, tableName);
+        Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" + 
tableName)));
+        // Rename root
+        Path vfsPath = vfsRoot;
+        Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file2.txt");
+        Assert.assertFalse(vfs.rename(vfsPath, vfsPath2));
+        Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" + 
tableName)));
+
+        // Rename database is not supported
+        vfsPath = new Path(vfsRoot, databaseName);
+        vfsPath2 = new Path(vfsRoot, databaseName + "_2");
+        try {
+            vfs.rename(vfsPath, vfsPath2);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+        Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" + 
tableName)));
+
+        // Rename table
+        // 1. table -> table_2
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+        vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName + "_2");
+        Assert.assertTrue(vfs.rename(vfsPath, vfsPath2));
+        checkTableExist(databaseName, tableName, false);
+        checkTableExist(databaseName, tableName + "_2", true);
+        // 2. table not exists
+        Assert.assertFalse(vfs.rename(vfsPath, vfsPath2));
+        Assert.assertFalse(vfs.rename(vfsPath, vfsPath));
+        // 3. src = dst
+        Assert.assertTrue(vfs.rename(vfsPath2, vfsPath2));
+        // 4. rename across database
+        try {
+            vfs.rename(vfsPath2, new Path(vfsRoot, databaseName + "_2/" + 
tableName));
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+
+        Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file.txt");
+        FSDataOutputStream out = vfs.create(vfsPath);
+        out.write("hello".getBytes());
+        out.close();
+        Assert.assertTrue(vfs.delete(vfsPath, false));
+        Assert.assertFalse(vfs.exists(vfsPath));
+
+        // Delete in non-existing table
+        tableName = "object_table2";
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName + 
"/test_dir/file.txt");
+        try {
+            vfs.delete(vfsPath, false);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testVirtualDelete() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "object_table";
+        createObjectTable(databaseName, tableName);
+        // Delete root is not supported
+        Path vfsPath = vfsRoot;
+        try {
+            vfs.delete(vfsPath, false);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+
+        // Delete database
+        // 1. recursive = false
+        vfsPath = new Path(vfsRoot, databaseName);
+        try {
+            vfs.delete(vfsPath, false);
+            Assert.fail();
+        } catch (IOException e) {
+        }
+        // 2. recursive = true
+        Assert.assertTrue(vfs.delete(vfsPath, true));
+        Assert.assertFalse(vfs.exists(vfsPath));
+        // 3. non-exist database, return false
+        Assert.assertFalse(vfs.delete(new Path(vfsRoot, databaseName + "_2"), 
true));
+
+        // Delete table
+        // 1. existing table
+        createObjectTable(databaseName, tableName);
+        vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+        Assert.assertTrue(vfs.delete(vfsPath, false));
+        Assert.assertFalse(vfs.exists(vfsPath));
+
+        // 2. non-exist table, return false
+        Assert.assertFalse(
+                vfs.delete(new Path(vfsRoot, databaseName + "/" + tableName + 
"_2"), true));
+        Assert.assertFalse(
+                vfs.delete(new Path(vfsRoot, databaseName + "_2/" + tableName 
+ "_2"), true));
+    }
+
+    @Test
+    public void testVisitNormalTable() throws Exception {
+        String databaseName = "test_db";
+        String tableName = "normal_table";
+        createNormalTable(databaseName, tableName);
+        Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+        // List normal table, directory schema should exist
+        FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+        Assert.assertTrue(fileStatus.isDirectory());
+        Assert.assertEquals(vfsPath.toString(), 
fileStatus.getPath().toString());
+        FileStatus[] fileStatuses = vfs.listStatus(vfsPath);
+        Assert.assertEquals(1, fileStatuses.length);
+        Assert.assertTrue(fileStatuses[0].isDirectory());
+        Assert.assertEquals(
+                new Path(vfsPath, "schema").toString(), 
fileStatuses[0].getPath().toString());
+    }
+}
diff --git a/paimon-vfs/pom.xml b/paimon-vfs/pom.xml
new file mode 100644
index 0000000000..b2430d8cb0
--- /dev/null
+++ b/paimon-vfs/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-vfs</artifactId>
+    <name>Paimon : VirtualFileSystem</name>
+
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>paimon-vfs-common</module>
+        <module>paimon-vfs-hadoop</module>
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <classifier>tests</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit4.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-format</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/pom.xml b/pom.xml
index 73d01c7118..57f9c6fb80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ under the License.
         <module>paimon-filesystems</module>
         <module>paimon-format</module>
         <module>paimon-lance</module>
+        <module>paimon-vfs</module>
         <module>paimon-bundle</module>
         <module>paimon-hive</module>
         <module>paimon-spark</module>


Reply via email to