This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 969c53e2ad [vfs] Implement Paimon Virtual Filesystem (#5916)
969c53e2ad is described below
commit 969c53e2adc904f691b20f373726abe4096442b1
Author: timmyyao <[email protected]>
AuthorDate: Wed Jul 23 10:28:17 2025 +0800
[vfs] Implement Paimon Virtual Filesystem (#5916)
---
paimon-vfs/paimon-vfs-common/pom.xml | 35 ++
.../apache/paimon/vfs/VFSCatalogIdentifier.java | 27 ++
.../java/org/apache/paimon/vfs/VFSDataToken.java | 64 +++
.../apache/paimon/vfs/VFSDatabaseIdentifier.java | 27 ++
.../java/org/apache/paimon/vfs/VFSIdentifier.java | 45 ++
.../java/org/apache/paimon/vfs/VFSOperations.java | 339 +++++++++++++++
.../org/apache/paimon/vfs/VFSTableIdentifier.java | 99 +++++
.../paimon/vfs/VFSTableObjectIdentifier.java | 38 ++
.../apache/paimon/vfs/VFSTableRootIdentifier.java | 38 ++
paimon-vfs/paimon-vfs-hadoop/pom.xml | 114 +++++
.../paimon/vfs/hadoop/PaimonVirtualFileSystem.java | 443 ++++++++++++++++++++
.../PaimonVirtualFileSystemConfiguration.java | 41 ++
.../java/org/apache/paimon/vfs/hadoop/Pvfs.java | 36 ++
.../apache/paimon/vfs/hadoop/VFSInputStream.java | 80 ++++
.../vfs/hadoop/MockRestVirtualFileSystemTest.java | 121 ++++++
.../paimon/vfs/hadoop/VirtualFileSystemTest.java | 457 +++++++++++++++++++++
paimon-vfs/pom.xml | 105 +++++
pom.xml | 1 +
18 files changed, 2110 insertions(+)
diff --git a/paimon-vfs/paimon-vfs-common/pom.xml
b/paimon-vfs/paimon-vfs-common/pom.xml
new file mode 100644
index 0000000000..5452384e7a
--- /dev/null
+++ b/paimon-vfs/paimon-vfs-common/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-vfs</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-vfs-common</artifactId>
+ <name>Paimon : VirtualFileSystem Common</name>
+
+ <packaging>jar</packaging>
+</project>
\ No newline at end of file
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
new file mode 100644
index 0000000000..3118121b93
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSCatalogIdentifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for catalog. */
+public class VFSCatalogIdentifier extends VFSIdentifier {
+
+ public VFSCatalogIdentifier() {
+ super(VFSFileType.CATALOG, null);
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
new file mode 100644
index 0000000000..730ee591af
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** Data token. */
+public class VFSDataToken implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final Map<String, String> token;
+ private final long expireAtMillis;
+ @Nullable private Integer hash;
+
+ public VFSDataToken(Map<String, String> token, long expireAtMillis) {
+ this.token = token;
+ this.expireAtMillis = expireAtMillis;
+ }
+
+ public Map<String, String> token() {
+ return this.token;
+ }
+
+ public long expireAtMillis() {
+ return this.expireAtMillis;
+ }
+
+ public boolean equals(Object o) {
+ if (o != null && this.getClass() == o.getClass()) {
+ VFSDataToken token1 = (VFSDataToken) o;
+ return this.expireAtMillis == token1.expireAtMillis
+ && Objects.equals(this.token, token1.token);
+ } else {
+ return false;
+ }
+ }
+
+ public int hashCode() {
+ if (this.hash == null) {
+ this.hash = Objects.hash(new Object[] {this.token,
this.expireAtMillis});
+ }
+
+ return this.hash;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
new file mode 100644
index 0000000000..02a7390788
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDatabaseIdentifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for database. */
+public class VFSDatabaseIdentifier extends VFSIdentifier {
+
+ public VFSDatabaseIdentifier(String databaseName) {
+ super(VFSFileType.DATABASE, databaseName);
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
new file mode 100644
index 0000000000..d454cec040
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSIdentifier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+/** Identifier for virtual path. */
+public abstract class VFSIdentifier {
+ enum VFSFileType {
+ CATALOG, // pvfs://catalog/
+ DATABASE, // pvfs://catalog/database/
+ TABLE, // pvfs://catalog/database/table/
+ TABLE_OBJECT // pvfs://catalog/database/table/file.txt
+ }
+
+ protected VFSFileType vfsFileType;
+ protected String databaseName;
+
+ public VFSIdentifier(VFSFileType vfsFileType, String databaseName) {
+ this.vfsFileType = vfsFileType;
+ this.databaseName = databaseName;
+ }
+
+ public VFSFileType getVfsFileType() {
+ return vfsFileType;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
new file mode 100644
index 0000000000..ad9cd09122
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTApi;
+import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.exceptions.AlreadyExistsException;
+import org.apache.paimon.rest.exceptions.BadRequestException;
+import org.apache.paimon.rest.exceptions.ForbiddenException;
+import org.apache.paimon.rest.exceptions.NoSuchResourceException;
+import org.apache.paimon.rest.exceptions.NotImplementedException;
+import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.ThreadUtils;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+
+/** Wrap over RESTCatalog to provide basic operations for virtual path. */
+public class VFSOperations {
+ private static final Logger LOG =
LoggerFactory.getLogger(VFSOperations.class);
+
+ private final RESTApi api;
+ private final CatalogContext context;
+
+ // table id -> fileIO
+ private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE =
+ Caffeine.newBuilder()
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ .maximumSize(1000)
+ .removalListener(
+ (ignored, value, cause) ->
IOUtils.closeQuietly((FileIO) value))
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newDaemonThreadFactory(
+
"rest-token-file-io-scheduler"))))
+ .build();
+
+ private static final Cache<String, VFSDataToken> TOKEN_CACHE =
+ Caffeine.newBuilder().expireAfterAccess(30,
TimeUnit.MINUTES).maximumSize(1000).build();
+
+ public VFSOperations(CatalogContext context) {
+ this.context = context;
+ this.api = new RESTApi(context.options());
+ }
+
+ public VFSIdentifier getVFSIdentifier(String virtualPath) throws
IOException {
+ if (virtualPath.startsWith("/")) {
+ virtualPath = virtualPath.substring(1);
+ }
+ String[] parts = virtualPath.split("/");
+ if (virtualPath.isEmpty() || parts.length == 0) {
+ return new VFSCatalogIdentifier();
+ } else if (parts.length == 1) {
+ return new VFSDatabaseIdentifier(parts[0]);
+ }
+ // parts.length >= 2: table or table object
+ String databaseName = parts[0];
+ String tableName = parts[1];
+ Identifier identifier = new Identifier(databaseName, tableName);
+ // Get table from REST server
+ GetTableResponse table;
+ try {
+ table = loadTableMetadata(identifier);
+ } catch (FileNotFoundException e) {
+ if (parts.length == 2) {
+ return new VFSTableRootIdentifier(databaseName, tableName);
+ } else {
+ return new VFSTableObjectIdentifier(databaseName, tableName);
+ }
+ }
+ if (table.isExternal()) {
+ throw new IOException("Do not support visiting external table " +
identifier);
+ }
+ // Get real path
+ StringBuilder realPath = new StringBuilder(table.getPath());
+ boolean isTableRoot = true;
+ if (parts.length > 2) {
+ isTableRoot = false;
+ if (!table.getPath().endsWith("/")) {
+ realPath.append("/");
+ }
+ for (int i = 2; i < parts.length; i++) {
+ realPath.append(parts[i]);
+ if (i < parts.length - 1) {
+ realPath.append("/");
+ }
+ }
+ }
+ // Get REST token
+ FileIO fileIO = getFileIO(new Identifier(databaseName, tableName),
table);
+
+ if (parts.length == 2) {
+ return new VFSTableRootIdentifier(
+ table, realPath.toString(), fileIO, databaseName,
tableName);
+ } else {
+ return new VFSTableObjectIdentifier(
+ table, realPath.toString(), fileIO, databaseName,
tableName);
+ }
+ }
+
+ public GetDatabaseResponse getDatabase(String databaseName) throws
IOException {
+ try {
+ return api.getDatabase(databaseName);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Database " + databaseName + " not
found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to access database " +
databaseName);
+ }
+ }
+
+ public List<String> listDatabases() {
+ return api.listDatabases();
+ }
+
+ public void createDatabase(String databaseName) throws IOException {
+ try {
+ api.createDatabase(databaseName, Collections.emptyMap());
+ } catch (AlreadyExistsException e) {
+ LOG.info("Database {} already exist, no need to create",
databaseName);
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to create database " +
databaseName);
+ } catch (BadRequestException e) {
+ throw new IOException("Bad request when creating database " +
databaseName, e);
+ }
+ }
+
+ public void dropDatabase(String databaseName, boolean recursive) throws
IOException {
+ try {
+ if (!recursive && !api.listTables(databaseName).isEmpty()) {
+ throw new IOException(
+ "Database "
+ + databaseName
+ + " is not empty, set recursive to true to
drop it");
+ }
+ api.dropDatabase(databaseName);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Database " + databaseName + " not
found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to drop database " +
databaseName);
+ }
+ }
+
+ public List<String> listTables(String databaseName) throws IOException {
+ try {
+ return api.listTables(databaseName);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Database " + databaseName + " not
found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to access database " +
databaseName);
+ }
+ }
+
+ public void createObjectTable(String databaseName, String tableName)
throws IOException {
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ Schema schema = Schema.newBuilder().option(TYPE.key(),
OBJECT_TABLE.toString()).build();
+ try {
+ tryCreateObjectTable(identifier, schema);
+ } catch (FileNotFoundException e) {
+ // Database not exist, try to create database and then create
table again
+ createDatabase(databaseName);
+ tryCreateObjectTable(identifier, schema);
+ }
+ }
+
+ public void dropTable(String databaseName, String tableName) throws
IOException {
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ try {
+ api.dropTable(identifier);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Table " + identifier + " not
found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to drop table " + identifier);
+ }
+ }
+
+ public void renameTable(String databaseName, String srcTableName, String
dstTableName)
+ throws IOException {
+ Identifier srcIdentifier = Identifier.create(databaseName,
srcTableName);
+ Identifier dstIdentifier = Identifier.create(databaseName,
dstTableName);
+ try {
+ api.renameTable(srcIdentifier, dstIdentifier);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Source table " + srcIdentifier +
" not found");
+ } catch (ForbiddenException e) {
+ throw new IOException(
+ "No permission to rename table " + srcIdentifier + " to "
+ dstIdentifier);
+ } catch (AlreadyExistsException e) {
+ throw new FileAlreadyExistsException(
+ "Target table " + dstIdentifier + " already exist");
+ } catch (BadRequestException e) {
+ throw new IOException(
+ "Bad request when renaming table " + srcIdentifier + " to
" + dstIdentifier, e);
+ }
+ }
+
+ private void tryCreateObjectTable(Identifier identifier, Schema schema)
throws IOException {
+ try {
+ api.createTable(identifier, schema);
+ } catch (AlreadyExistsException e) {
+ LOG.info("Table {} already exist, no need to create", identifier);
+ } catch (NotImplementedException e) {
+ throw new IOException("Create object table not implemented");
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Database not found");
+ } catch (BadRequestException e) {
+ throw new IOException("Bad request when creating table " +
identifier, e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Illegal argument when creating table " +
identifier, e);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private FileIO getFileIO(Identifier identifier, GetTableResponse table)
throws IOException {
+ VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId());
+ if (shouldRefresh(token)) {
+ synchronized (TOKEN_CACHE) {
+ token = TOKEN_CACHE.getIfPresent(table.getId());
+ if (shouldRefresh(token)) {
+ token = refreshToken(identifier);
+ TOKEN_CACHE.put(table.getId(), token);
+ }
+ }
+ }
+
+ FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
+ if (fileIO != null) {
+ return fileIO;
+ }
+
+ synchronized (FILE_IO_CACHE) {
+ fileIO = FILE_IO_CACHE.getIfPresent(token);
+ if (fileIO != null) {
+ return fileIO;
+ }
+
+ Options options = context.options();
+ // the original options are not overwritten
+ options = new Options(RESTUtil.merge(token.token(),
options.toMap()));
+ options.set(FILE_IO_ALLOW_CACHE, false);
+ CatalogContext fileIOContext = CatalogContext.create(options);
+ fileIO = FileIO.get(new Path(table.getPath()), fileIOContext);
+ FILE_IO_CACHE.put(token, fileIO);
+ return fileIO;
+ }
+ }
+
+ private boolean shouldRefresh(VFSDataToken token) {
+ return token == null
+ || token.expireAtMillis() - System.currentTimeMillis()
+ < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+ }
+
+ private VFSDataToken refreshToken(Identifier identifier) throws
IOException {
+ LOG.info("begin refresh data token for identifier [{}]", identifier);
+ GetTableTokenResponse response;
+ try {
+ response = api.loadTableToken(identifier);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Table " + identifier + " not
found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to access table " +
identifier);
+ }
+
+ LOG.info(
+ "end refresh data token for identifier [{}] expiresAtMillis
[{}]",
+ identifier,
+ response.getExpiresAtMillis());
+
+ VFSDataToken token = new VFSDataToken(response.getToken(),
response.getExpiresAtMillis());
+ return token;
+ }
+
+ private GetTableResponse loadTableMetadata(Identifier identifier) throws
IOException {
+ // if the table is system table, we need to load table metadata from
the system table's data
+ // table
+ Identifier loadTableIdentifier =
+ identifier.isSystemTable()
+ ? new Identifier(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ identifier.getBranchName())
+ : identifier;
+
+ GetTableResponse response;
+ try {
+ response = api.getTable(loadTableIdentifier);
+ } catch (NoSuchResourceException e) {
+ throw new FileNotFoundException("Table not found");
+ } catch (ForbiddenException e) {
+ throw new IOException("No permission to access table " +
identifier);
+ }
+
+ return response;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
new file mode 100644
index 0000000000..86f0605322
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableIdentifier.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/** Identifier for table. */
+public abstract class VFSTableIdentifier extends VFSIdentifier {
+ protected Path realPath;
+ protected String scheme;
+ protected URI realUri;
+ protected String tableName;
+ protected GetTableResponse table;
+ protected FileIO fileIO;
+ protected String tableLocation;
+
+ // Constructor for non-exist table
+ public VFSTableIdentifier(VFSFileType vfsFileType, String databaseName,
String tableName) {
+ super(vfsFileType, databaseName);
+ this.tableName = tableName;
+ }
+
+ // Constructor for existing table
+ public VFSTableIdentifier(
+ VFSFileType vfsFileType,
+ GetTableResponse table,
+ String realPath,
+ FileIO fileIO,
+ String databaseName,
+ String tableName) {
+ super(vfsFileType, databaseName);
+ this.realPath = new Path(realPath);
+ try {
+ this.realUri = new URI(realPath);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ this.scheme = realUri.getScheme();
+ this.tableName = tableName;
+ this.table = table;
+ this.fileIO = fileIO;
+ if (table != null) {
+ this.tableLocation = table.getPath();
+ }
+ }
+
+ public Path getRealPath() {
+ return realPath;
+ }
+
+ public URI getRealUri() {
+ return realUri;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public GetTableResponse getTable() {
+ return table;
+ }
+
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ public String getTableLocation() {
+ return tableLocation;
+ }
+
+ public boolean isTableExist() {
+ return table != null;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
new file mode 100644
index 0000000000..b0cdfbb1bb
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableObjectIdentifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+/** Identifier for objects under a table. */
+public class VFSTableObjectIdentifier extends VFSTableIdentifier {
+ public VFSTableObjectIdentifier(String databaseName, String tableName) {
+ super(VFSFileType.TABLE_OBJECT, databaseName, tableName);
+ }
+
+ public VFSTableObjectIdentifier(
+ GetTableResponse table,
+ String realPath,
+ FileIO fileIO,
+ String databaseName,
+ String tableName) {
+ super(VFSFileType.TABLE_OBJECT, table, realPath, fileIO, databaseName,
tableName);
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
new file mode 100644
index 0000000000..ee398b06f6
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSTableRootIdentifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.rest.responses.GetTableResponse;
+
+/** Identifier for objects under a table. */
+public class VFSTableRootIdentifier extends VFSTableIdentifier {
+ public VFSTableRootIdentifier(String databaseName, String tableName) {
+ super(VFSFileType.TABLE, databaseName, tableName);
+ }
+
+ public VFSTableRootIdentifier(
+ GetTableResponse table,
+ String realPath,
+ FileIO fileIO,
+ String databaseName,
+ String tableName) {
+ super(VFSFileType.TABLE, table, realPath, fileIO, databaseName,
tableName);
+ }
+}
diff --git a/paimon-vfs/paimon-vfs-hadoop/pom.xml
b/paimon-vfs/paimon-vfs-hadoop/pom.xml
new file mode 100644
index 0000000000..3c53ded410
--- /dev/null
+++ b/paimon-vfs/paimon-vfs-hadoop/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-vfs</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-vfs-hadoop</artifactId>
+ <name>Paimon : VirtualFileSystem Hadoop</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <fs.hadoopshaded.version>3.3.4</fs.hadoopshaded.version>
+ <fs.s3.aws.version>1.12.319</fs.s3.aws.version>
+ <commons.beanutils.version>1.9.4</commons.beanutils.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-vfs-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${fs.hadoopshaded.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aliyun</artifactId>
+ <version>${fs.hadoopshaded.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-oss-sdk</artifactId>
+ </exclusion>
+ <exclusion>
+ <!-- provided by paimon-hadoop-shaded -->
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+
<include>org.apache.paimon:paimon-vfs-common</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
new file mode 100644
index 0000000000..3eb1849b23
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.vfs.VFSCatalogIdentifier;
+import org.apache.paimon.vfs.VFSDatabaseIdentifier;
+import org.apache.paimon.vfs.VFSIdentifier;
+import org.apache.paimon.vfs.VFSOperations;
+import org.apache.paimon.vfs.VFSTableIdentifier;
+import org.apache.paimon.vfs.VFSTableObjectIdentifier;
+import org.apache.paimon.vfs.VFSTableRootIdentifier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+/** Paimon virtual file system. */
+public class PaimonVirtualFileSystem extends FileSystem {
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonVirtualFileSystem.class);
+
+ private Path workingDirectory;
+ private URI uri;
+ private VFSOperations vfsOperations;
+ private Configuration conf;
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ setConf(conf);
+ this.conf = conf;
+ super.initialize(uri, conf);
+
+ this.workingDirectory = new Path(uri);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority() +
"/");
+
+ initVFSOperations();
+ }
+
+ private void initVFSOperations() {
+ Options options =
PaimonVirtualFileSystemConfiguration.convertToCatalogOptions(conf);
+ // pvfs://catalog_name/database_name/table_name/file, so uri authority
is catalog name
+ options.set(CatalogOptions.WAREHOUSE, uri.getAuthority());
+
+ CatalogContext catalogContext = CatalogContext.create(options);
+ vfsOperations = new VFSOperations(catalogContext);
+ }
+
+ private String getVirtualPath(Path path) {
+ Path qualifiedPath = path.makeQualified(uri, workingDirectory);
+ // convert to absolute path, like /database_name/table_name/file
+ return qualifiedPath.toUri().getPath();
+ }
+
+ @Override
+ public FSDataOutputStream create(
+ Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress)
+ throws IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ throw new IOException(
+ "Cannot create file for virtual path " + f + " which is a
catalog");
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ throw new IOException(
+ "Cannot create file for virtual path " + f + " which is a
database");
+ } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+ throw new IOException(
+ "Cannot create file for table level virtual path " + f + "
which is a table");
+ } else {
+ VFSTableObjectIdentifier vfsTableObjectIdentifier =
+ (VFSTableObjectIdentifier) vfsIdentifier;
+ if (!vfsTableObjectIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot create a file for virtual path "
+ + f
+ + " which is not in an existing table");
+ }
+ PositionOutputStream out =
+ vfsTableObjectIdentifier
+ .fileIO()
+
.newOutputStream(vfsTableObjectIdentifier.getRealPath(), overwrite);
+ return new FSDataOutputStream(out, statistics);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws
IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(path));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ throw new FileNotFoundException(
+ "Cannot open file for virtual path " + path + " which is a
catalog");
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ throw new FileNotFoundException(
+ "Cannot open file for virtual path " + path + " which is a
database");
+ } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+ throw new FileNotFoundException(
+ "Cannot open file for virtual path " + path + " which is a
table");
+ } else {
+ VFSTableObjectIdentifier vfsTableObjectIdentifier =
+ (VFSTableObjectIdentifier) vfsIdentifier;
+ if (!vfsTableObjectIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot open file for virtual path "
+ + path
+ + " which is not in an existing table");
+ }
+ VFSInputStream in =
+ new VFSInputStream(
+ vfsTableObjectIdentifier
+ .fileIO()
+
.newInputStream(vfsTableObjectIdentifier.getRealPath()),
+ statistics);
+ return new FSDataInputStream(in);
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable
progress)
+ throws IOException {
+ throw new IOException("Append is not supported");
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ VFSIdentifier srcVfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(src));
+ VFSIdentifier dstVfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(dst));
+ if (srcVfsIdentifier instanceof VFSCatalogIdentifier) {
+ LOG.debug("Rename root path is ignored");
+ return false;
+ } else if (srcVfsIdentifier instanceof VFSDatabaseIdentifier) {
+ throw new IOException(
+ "Cannot rename from virtual path " + src + " which is a
database");
+ } else if (srcVfsIdentifier instanceof VFSTableRootIdentifier) {
+ if (!(dstVfsIdentifier instanceof VFSTableRootIdentifier)) {
+ throw new IOException(
+ "Cannot rename from table path " + src + " to
non-table path " + dst);
+ }
+ return renameTable(
+ (VFSTableRootIdentifier) srcVfsIdentifier,
+ (VFSTableRootIdentifier) dstVfsIdentifier);
+ } else {
+ if (!(dstVfsIdentifier instanceof VFSTableIdentifier)) {
+ throw new IOException(
+ "Cannot rename to virtual path " + dst + " which is
not a table");
+ }
+ VFSTableIdentifier srcTableIdentifier = (VFSTableIdentifier)
srcVfsIdentifier;
+ VFSTableIdentifier dstTableIdentifier = (VFSTableIdentifier)
dstVfsIdentifier;
+ if (!srcTableIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot rename from virtual path "
+ + src
+ + " which is not in an existing table");
+ }
+ if (!dstTableIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot rename to virtual path "
+ + dst
+ + " which is not in an existing table");
+ }
+ GetTableResponse srcTable = srcTableIdentifier.getTable();
+ GetTableResponse dstTable = dstTableIdentifier.getTable();
+ if (!srcTable.getId().equals(dstTable.getId())) {
+ throw new IOException(
+ "Cannot rename from virtual path "
+ + src
+ + " to virtual path "
+ + dst
+ + " which is not in the same table");
+ }
+ return srcTableIdentifier
+ .fileIO()
+ .rename(srcTableIdentifier.getRealPath(),
dstTableIdentifier.getRealPath());
+ }
+ }
+
+ private boolean renameTable(
+ VFSTableRootIdentifier srcIdentifier, VFSTableRootIdentifier
dstIdentifier)
+ throws IOException {
+ if
(!srcIdentifier.getDatabaseName().equals(dstIdentifier.getDatabaseName())) {
+ throw new IOException("Do not support rename table with different
database");
+ }
+ if (!srcIdentifier.isTableExist()) {
+ // return false if src does not exist
+ LOG.debug(
+ "Source table not found "
+ + srcIdentifier.getDatabaseName()
+ + "."
+ + srcIdentifier.getTableName());
+ return false;
+ }
+ if (srcIdentifier.getTableName().equals(dstIdentifier.getTableName()))
{
+ // src equals to dst, return true
+ return true;
+ }
+ try {
+ vfsOperations.renameTable(
+ srcIdentifier.getDatabaseName(),
+ srcIdentifier.getTableName(),
+ dstIdentifier.getTableName());
+ return true;
+ } catch (FileNotFoundException e) {
+ LOG.debug(
+ "Source table not found "
+ + srcIdentifier.getDatabaseName()
+ + "."
+ + srcIdentifier.getTableName());
+ return false;
+ }
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ throw new IOException("Cannot delete virtual path " + f + " which
is a catalog");
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ try {
+ vfsOperations.dropDatabase(vfsIdentifier.getDatabaseName(),
recursive);
+ } catch (FileNotFoundException e) {
+ LOG.debug("Database not found for deleting path " + f);
+ return false;
+ }
+ return true;
+ } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+ VFSTableRootIdentifier vfsTableRootIdentifier =
(VFSTableRootIdentifier) vfsIdentifier;
+ try {
+ vfsOperations.dropTable(
+ vfsTableRootIdentifier.getDatabaseName(),
+ vfsTableRootIdentifier.getTableName());
+ } catch (FileNotFoundException e) {
+ LOG.debug("Table not found for deleting path " + f);
+ return false;
+ }
+ return true;
+ } else {
+ VFSTableObjectIdentifier vfsTableObjectIdentifier =
+ (VFSTableObjectIdentifier) vfsIdentifier;
+ if (!vfsTableObjectIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot delete virtual path " + f + " which is not in
an existing table");
+ }
+ return vfsTableObjectIdentifier
+ .fileIO()
+ .delete(vfsTableObjectIdentifier.getRealPath(), recursive);
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ return new FileStatus(0, true, 1, 1, 0, new Path(this.uri));
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ GetDatabaseResponse database =
+ vfsOperations.getDatabase(vfsIdentifier.getDatabaseName());
+ return convertDatabase(database);
+ } else {
+ VFSTableIdentifier vfsTableIdentifier = (VFSTableIdentifier)
vfsIdentifier;
+ if (!vfsTableIdentifier.isTableExist()) {
+ throw new FileNotFoundException("Table not found for path " +
f);
+ }
+ org.apache.paimon.fs.FileStatus fileStatus =
+
vfsTableIdentifier.fileIO().getFileStatus(vfsTableIdentifier.getRealPath());
+ return convertFileStatus(vfsTableIdentifier, fileStatus);
+ }
+ }
+
+ private FileStatus convertDatabase(GetDatabaseResponse database) {
+ return new FileStatus(0, true, 1, 1, 0, new Path(new Path(this.uri),
database.getName()));
+ }
+
+ private FileStatus convertFileStatus(
+ VFSTableIdentifier vfsIdentifier, org.apache.paimon.fs.FileStatus
paimonFileStatus)
+ throws IOException {
+ String realPath = paimonFileStatus.getPath().toString();
+ if (!realPath.startsWith(vfsIdentifier.getTableLocation())) {
+ throw new IOException(
+ "Result path "
+ + realPath
+ + " does not start with table location "
+ + vfsIdentifier.getTableLocation());
+ }
+ String childPath =
realPath.substring(vfsIdentifier.getTableLocation().length());
+ if (!childPath.startsWith("/")) {
+ childPath = "/" + childPath;
+ }
+ Path virtualPath =
+ new Path(
+ new Path(this.uri),
+ vfsIdentifier.getDatabaseName()
+ + "/"
+ + vfsIdentifier.getTableName()
+ + childPath);
+ return new FileStatus(
+ paimonFileStatus.getLen(),
+ paimonFileStatus.isDir(),
+ 1,
+ 1,
+ paimonFileStatus.getModificationTime(),
+ virtualPath);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ List<String> databases = vfsOperations.listDatabases();
+ return convertDatabases(databases);
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ List<String> tables =
vfsOperations.listTables(vfsIdentifier.getDatabaseName());
+ return convertTables(vfsIdentifier.getDatabaseName(), tables);
+ } else {
+ VFSTableIdentifier vfsTableIdentifier = (VFSTableIdentifier)
vfsIdentifier;
+ if (!vfsTableIdentifier.isTableExist()) {
+ throw new FileNotFoundException("Table not found for path " +
f);
+ }
+ org.apache.paimon.fs.FileStatus[] paimonFileStatuses =
+
vfsTableIdentifier.fileIO().listStatus(vfsTableIdentifier.getRealPath());
+ return convertFileStatuses(vfsTableIdentifier, paimonFileStatuses);
+ }
+ }
+
+ private FileStatus[] convertDatabases(List<String> databases) {
+ FileStatus[] fileStatuses = new FileStatus[databases.size()];
+ for (int i = 0; i < databases.size(); i++) {
+ String database = databases.get(i);
+ FileStatus fileStatus =
+ new FileStatus(0, true, 1, 1, 0, new Path(new
Path(this.uri), database));
+ fileStatuses[i] = fileStatus;
+ }
+ return fileStatuses;
+ }
+
+ private FileStatus[] convertTables(String database, List<String> tables) {
+ FileStatus[] fileStatuses = new FileStatus[tables.size()];
+ for (int i = 0; i < tables.size(); i++) {
+ String table = tables.get(i);
+ FileStatus fileStatus =
+ new FileStatus(
+ 0, true, 1, 1, 0, new Path(new Path(this.uri),
database + "/" + table));
+ fileStatuses[i] = fileStatus;
+ }
+ return fileStatuses;
+ }
+
+ private FileStatus[] convertFileStatuses(
+ VFSTableIdentifier vfsIdentifier,
org.apache.paimon.fs.FileStatus[] paimonFileStatuses)
+ throws IOException {
+ FileStatus[] fileStatuses = new FileStatus[paimonFileStatuses.length];
+ for (int i = 0; i < paimonFileStatuses.length; i++) {
+ fileStatuses[i] = convertFileStatus(vfsIdentifier,
paimonFileStatuses[i]);
+ }
+ return fileStatuses;
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ VFSIdentifier vfsIdentifier =
vfsOperations.getVFSIdentifier(getVirtualPath(f));
+ if (vfsIdentifier instanceof VFSCatalogIdentifier) {
+ throw new IOException("Cannot mkdirs for virtual path " + f + "
which is a catalog");
+ } else if (vfsIdentifier instanceof VFSDatabaseIdentifier) {
+ vfsOperations.createDatabase(vfsIdentifier.getDatabaseName());
+ return true;
+ } else if (vfsIdentifier instanceof VFSTableRootIdentifier) {
+ VFSTableRootIdentifier vfsTableRootIdentifier =
(VFSTableRootIdentifier) vfsIdentifier;
+ if (vfsTableRootIdentifier.isTableExist()) {
+ // Table already exists, no need to execute
+ return true;
+ }
+ vfsOperations.createObjectTable(
+ vfsIdentifier.getDatabaseName(),
vfsTableRootIdentifier.getTableName());
+ return true;
+ } else {
+ VFSTableObjectIdentifier vfsTableObjectIdentifier =
+ (VFSTableObjectIdentifier) vfsIdentifier;
+ if (!vfsTableObjectIdentifier.isTableExist()) {
+ throw new IOException(
+ "Cannot mkdirs for virtual path "
+ + f
+ + " which is not in an existing table");
+ }
+ return
vfsTableObjectIdentifier.fileIO().mkdirs(vfsTableObjectIdentifier.getRealPath());
+ }
+ }
+
+ @Override
+ public String getScheme() {
+ return "pvfs";
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ workingDirectory = newDir;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDirectory;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
new file mode 100644
index 0000000000..9c68976161
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystemConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.options.Options;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/** Paimon virtual file system configuration. */
+public class PaimonVirtualFileSystemConfiguration {
+ public static final String PAIMON_VFS_PREFIX = "fs.pvfs.";
+
+ public static Options convertToCatalogOptions(Configuration conf) {
+ Options options = new Options();
+ for (Map.Entry<String, String> entry : conf) {
+ if (entry.getKey().startsWith(PAIMON_VFS_PREFIX)) {
+ String key =
entry.getKey().substring(PAIMON_VFS_PREFIX.length());
+ options.set(key, entry.getValue());
+ }
+ }
+ return options;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
new file mode 100644
index 0000000000..e878153bed
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/Pvfs.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * A {@link DelegateToFileSystem} implementation that delegates all operations
to a {@link
+ * PaimonVirtualFileSystem}.
+ */
+public class Pvfs extends DelegateToFileSystem {
+ public Pvfs(URI theUri, Configuration conf) throws IOException,
URISyntaxException {
+ super(theUri, new PaimonVirtualFileSystem(), conf, theUri.getScheme(),
false);
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
new file mode 100644
index 0000000000..020a58d318
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/VFSInputStream.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.fs.SeekableInputStream;
+
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/**
+ * VFSInputStream wrap over paimon SeekableInputStream to support hadoop
FSDataInputStream. TODO:
+ * SeekableInputStream interface is too simple to fully support all
FSDataInputStream operations: 1.
+ * ByteBufferReadable and ByteBufferPositionedReadable should be implemented
for full support. 2.
+ * Positioned read is not supported in SeekableInputStream, so it is by
default implemented by
+ * sequence read, which is not a good solution.
+ */
+public class VFSInputStream extends FSInputStream {
+ private SeekableInputStream in;
+ private byte[] oneByteBuf = new byte[1];
+ private FileSystem.Statistics statistics;
+
+ public VFSInputStream(SeekableInputStream in, FileSystem.Statistics
statistics) {
+ this.in = in;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ in.seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long var1) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int byteRead = in.read(b, off, len);
+ if (statistics != null && byteRead >= 0) {
+ statistics.incrementBytesRead(byteRead);
+ }
+ return byteRead;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int n;
+ while ((n = read(oneByteBuf, 0, 1)) == 0) {
+ /* no op */
+ }
+ if (statistics != null && n >= 0) {
+ statistics.incrementBytesRead(n);
+ }
+ return (n == -1) ? -1 : oneByteBuf[0] & 0xff;
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
new file mode 100644
index 0000000000..0b3307db96
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/MockRestVirtualFileSystemTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTTokenFileIO;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/** Test for {@link PaimonVirtualFileSystem} with Mock Rest Server. */
+public class MockRestVirtualFileSystemTest extends VirtualFileSystemTest {
+ private RESTCatalogServer restCatalogServer;
+ private final String serverDefineHeaderName = "test-header";
+ private final String serverDefineHeaderValue = "test-value";
+ private String dataPath;
+ private AuthProvider authProvider;
+ private Map<String, String> authMap;
+ private String initToken = "init_token";
+ private String restWarehouse;
+
+ @BeforeEach
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ dataPath = warehouse;
+ this.authProvider = new BearTokenAuthProvider(initToken);
+ this.authMap =
+ ImmutableMap.of(
+ RESTCatalogOptions.TOKEN.key(),
+ initToken,
+ RESTCatalogOptions.TOKEN_PROVIDER.key(),
+ AuthProviderEnum.BEAR.identifier());
+ this.restWarehouse = UUID.randomUUID().toString();
+ this.catalog = initCatalog(false);
+
+ // test retry commit
+ RESTCatalogServer.commitSuccessThrowException = true;
+
+ initFs();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ restCatalogServer.shutdown();
+ }
+
+ private RESTCatalog initCatalog(boolean enableDataToken) throws
IOException {
+ this.config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ "header." + serverDefineHeaderName,
+ serverDefineHeaderValue,
+ RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+ enableDataToken + "",
+ CatalogOptions.WAREHOUSE.key(),
+ restWarehouse),
+ ImmutableMap.of());
+ restCatalogServer =
+ new RESTCatalogServer(dataPath, this.authProvider,
this.config, restWarehouse);
+ restCatalogServer.start();
+ for (Map.Entry<String, String> entry : this.authMap.entrySet()) {
+ options.set(entry.getKey(), entry.getValue());
+ }
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ String path =
+ enableDataToken
+ ? dataPath.replaceFirst("file",
RESTFileIOTestLoader.SCHEME)
+ : dataPath;
+ options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
+ return new RESTCatalog(CatalogContext.create(options));
+ }
+
+ private void initFs() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.pvfs.uri", restCatalogServer.getUrl());
+ conf.set("fs.pvfs.token.provider", AuthProviderEnum.BEAR.identifier());
+ conf.set("fs.pvfs.token", initToken);
+ this.vfs = new PaimonVirtualFileSystem();
+ this.vfsRoot = new Path("pvfs://" + restWarehouse + "/");
+ this.vfs.initialize(vfsRoot.toUri(), conf);
+ }
+}
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
new file mode 100644
index 0000000000..8b63cc187d
--- /dev/null
+++
b/paimon-vfs/paimon-vfs-hadoop/src/test/java/org/apache/paimon/vfs/hadoop/VirtualFileSystemTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vfs.hadoop;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.OBJECT_TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonVirtualFileSystem}. */
+public abstract class VirtualFileSystemTest {
+ @TempDir java.nio.file.Path tempFile;
+ protected String warehouse;
+ protected FileIO fileIO;
+ protected RESTCatalog catalog;
+ protected ConfigResponse config;
+ protected Options options = new Options();
+ protected FileSystem vfs;
+ protected Path vfsRoot;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ warehouse = tempFile.toUri().toString();
+ Options catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions);
+ fileIO = new ResolvingFileIO();
+ fileIO.configure(catalogContext);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (catalog != null) {
+ List<String> dbs = catalog.listDatabases();
+ for (String db : dbs) {
+ try {
+ catalog.dropDatabase(db, true, true);
+ } catch (Exception ignored) {
+ }
+ }
+ catalog.close();
+ }
+ }
+
+ protected void createObjectTable(String databaseName, String tableName)
throws Exception {
+ catalog.createDatabase(databaseName, true);
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ Schema schema = Schema.newBuilder().option(TYPE.key(),
OBJECT_TABLE.toString()).build();
+ catalog.createTable(identifier, schema, false);
+ Table table = catalog.getTable(identifier);
+ assertThat(table).isInstanceOf(ObjectTable.class);
+ }
+
+ protected void createNormalTable(String databaseName, String tableName)
throws Exception {
+ catalog.createDatabase(databaseName, true);
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .build();
+ catalog.createTable(identifier, schema, false);
+ Table table = catalog.getTable(identifier);
+ assertThat(table).isInstanceOf(FileStoreTable.class);
+ }
+
+ protected void createDatabase(String databaseName) throws Exception {
+ catalog.createDatabase(databaseName, true);
+ }
+
+ protected void checkTableExist(String databaseName, String tableName,
boolean expect)
+ throws Exception {
+ try {
+ Table table = catalog.getTable(Identifier.create(databaseName,
tableName));
+ Assert.assertEquals(expect, true);
+ } catch (Catalog.TableNotExistException e) {
+ Assert.assertEquals(expect, false);
+ }
+ }
+
+ @Test
+ public void testMkdir() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir");
+ vfs.mkdirs(vfsPath);
+ FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+ Assert.assertEquals(vfsPath.toString(),
fileStatus.getPath().toString());
+ Assert.assertTrue(fileStatus.isDirectory());
+
+ // Mkdir in non-existing table
+ tableName = "object_table2";
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir");
+ try {
+ vfs.mkdirs(vfsPath);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testVirtualMkdir() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+
+ // Mkdir for root is not supported
+ try {
+ vfs.mkdirs(vfsRoot);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+
+ // Create database in virtual file system
+ Path vfsPath = new Path(vfsRoot, databaseName);
+ Assert.assertTrue(vfs.mkdirs(vfsPath));
+ Database database = catalog.getDatabase(databaseName);
+ Assert.assertEquals(databaseName, database.name());
+
+ // Create object table in virtual file system
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+ Assert.assertTrue(vfs.mkdirs(vfsPath));
+ Table table = catalog.getTable(new Identifier(databaseName,
tableName));
+ assertThat(table).isInstanceOf(ObjectTable.class);
+
+ // Create object table with database recursively created
+ databaseName = "test_db2";
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+ Assert.assertTrue(vfs.mkdirs(vfsPath));
+ table = catalog.getTable(new Identifier(databaseName, tableName));
+ assertThat(table).isInstanceOf(ObjectTable.class);
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file.txt");
+ FSDataOutputStream out = vfs.create(vfsPath);
+ out.write("hello".getBytes());
+ out.close();
+
+ FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+ Assert.assertEquals(vfsPath.toString(),
fileStatus.getPath().toString());
+ Assert.assertTrue(fileStatus.isFile());
+ Assert.assertEquals(5, fileStatus.getLen());
+
+ FSDataInputStream in = vfs.open(vfsPath);
+ byte[] buffer = new byte[5];
+ in.read(buffer);
+ in.close();
+ Assert.assertArrayEquals("hello".getBytes(), buffer);
+
+ // Create file in non-existing table
+ tableName = "object_table2";
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file.txt");
+ try {
+ vfs.create(vfsPath);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testVirtualCreate() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ // Create root
+ try {
+ vfs.create(vfsRoot);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+
+ // Create file for database
+ try {
+ vfs.create(new Path(vfsRoot, databaseName));
+ Assert.fail();
+ } catch (IOException e) {
+ }
+
+ // Create file for table
+ try {
+ vfs.create(new Path(vfsRoot, databaseName + "/" + tableName));
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testListStatus() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ Path vfsPath1 = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir");
+ vfs.mkdirs(vfsPath1);
+ Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName +
"/file.txt");
+ FSDataOutputStream out = vfs.create(vfsPath2);
+ out.write("hello".getBytes());
+ out.close();
+
+ FileStatus[] fileStatuses =
+ vfs.listStatus(new Path(vfsRoot, databaseName + "/" +
tableName));
+ Assert.assertEquals(2, fileStatuses.length);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.getPath().toString().equals(vfsPath1.toString())) {
+ Assert.assertTrue(fileStatus.isDirectory());
+ } else if
(fileStatus.getPath().toString().equals(vfsPath2.toString())) {
+ Assert.assertTrue(fileStatus.isFile());
+ Assert.assertEquals(5, fileStatus.getLen());
+ }
+ }
+
+ // List in non-existing table
+ tableName = "object_table2";
+ try {
+ vfs.listStatus(new Path(vfsRoot, databaseName + "/" + tableName));
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testVirtualListStatus() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ // List root
+ FileStatus[] fileStatuses = vfs.listStatus(vfsRoot);
+ Assert.assertEquals(1, fileStatuses.length);
+ Assert.assertEquals(
+ new Path(vfsRoot, databaseName).toString(),
fileStatuses[0].getPath().toString());
+ Assert.assertTrue(fileStatuses[0].isDirectory());
+
+ // List database
+ fileStatuses = vfs.listStatus(new Path(vfsRoot, databaseName));
+ Assert.assertEquals(1, fileStatuses.length);
+ Assert.assertEquals(
+ new Path(vfsRoot, databaseName + "/" + tableName).toString(),
+ fileStatuses[0].getPath().toString());
+ Assert.assertTrue(fileStatuses[0].isDirectory());
+ }
+
+ @Test
+ public void testRename() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file.txt");
+ FSDataOutputStream out = vfs.create(vfsPath);
+ out.write("hello".getBytes());
+ out.close();
+
+ Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file2.txt");
+ Assert.assertTrue(vfs.rename(vfsPath, vfsPath2));
+
+ Assert.assertFalse(vfs.exists(vfsPath));
+ FileStatus fileStatus = vfs.getFileStatus(vfsPath2);
+ Assert.assertEquals(vfsPath2.toString(),
fileStatus.getPath().toString());
+ Assert.assertTrue(fileStatus.isFile());
+ Assert.assertEquals(5, fileStatus.getLen());
+
+ // Rename in non-existing table
+ String tableName2 = "object_table2";
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName2 +
"/test_dir/file.txt");
+ vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName2 +
"/test_dir/file2.txt");
+ try {
+ vfs.rename(vfsPath, vfsPath2);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+
+ // Rename cross table is not supported
+ createObjectTable(databaseName, tableName2);
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file2.txt");
+ vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName2 +
"/test_dir/file2.txt");
+ try {
+ vfs.rename(vfsPath, vfsPath2);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testVirtualRename() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createNormalTable(databaseName, tableName);
+ Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" +
tableName)));
+ // Rename root
+ Path vfsPath = vfsRoot;
+ Path vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file2.txt");
+ Assert.assertFalse(vfs.rename(vfsPath, vfsPath2));
+ Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" +
tableName)));
+
+ // Rename database is not supported
+ vfsPath = new Path(vfsRoot, databaseName);
+ vfsPath2 = new Path(vfsRoot, databaseName + "_2");
+ try {
+ vfs.rename(vfsPath, vfsPath2);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ Assert.assertTrue(vfs.exists(new Path(vfsRoot, databaseName + "/" +
tableName)));
+
+ // Rename table
+ // 1. table -> table_2
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+ vfsPath2 = new Path(vfsRoot, databaseName + "/" + tableName + "_2");
+ Assert.assertTrue(vfs.rename(vfsPath, vfsPath2));
+ checkTableExist(databaseName, tableName, false);
+ checkTableExist(databaseName, tableName + "_2", true);
+ // 2. table not exists
+ Assert.assertFalse(vfs.rename(vfsPath, vfsPath2));
+ Assert.assertFalse(vfs.rename(vfsPath, vfsPath));
+ // 3. src = dst
+ Assert.assertTrue(vfs.rename(vfsPath2, vfsPath2));
+ // 4. rename across database
+ try {
+ vfs.rename(vfsPath2, new Path(vfsRoot, databaseName + "_2/" +
tableName));
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+
+ Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file.txt");
+ FSDataOutputStream out = vfs.create(vfsPath);
+ out.write("hello".getBytes());
+ out.close();
+ Assert.assertTrue(vfs.delete(vfsPath, false));
+ Assert.assertFalse(vfs.exists(vfsPath));
+
+ // Delete in non-existing table
+ tableName = "object_table2";
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName +
"/test_dir/file.txt");
+ try {
+ vfs.delete(vfsPath, false);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testVirtualDelete() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "object_table";
+ createObjectTable(databaseName, tableName);
+ // Delete root is not supported
+ Path vfsPath = vfsRoot;
+ try {
+ vfs.delete(vfsPath, false);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+
+ // Delete database
+ // 1. recursive = false
+ vfsPath = new Path(vfsRoot, databaseName);
+ try {
+ vfs.delete(vfsPath, false);
+ Assert.fail();
+ } catch (IOException e) {
+ }
+ // 2. recursive = true
+ Assert.assertTrue(vfs.delete(vfsPath, true));
+ Assert.assertFalse(vfs.exists(vfsPath));
+ // 3. non-exist database, return false
+ Assert.assertFalse(vfs.delete(new Path(vfsRoot, databaseName + "_2"),
true));
+
+ // Delete table
+ // 1. existing table
+ createObjectTable(databaseName, tableName);
+ vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+ Assert.assertTrue(vfs.delete(vfsPath, false));
+ Assert.assertFalse(vfs.exists(vfsPath));
+
+ // 2. non-exist table, return false
+ Assert.assertFalse(
+ vfs.delete(new Path(vfsRoot, databaseName + "/" + tableName +
"_2"), true));
+ Assert.assertFalse(
+ vfs.delete(new Path(vfsRoot, databaseName + "_2/" + tableName
+ "_2"), true));
+ }
+
+ @Test
+ public void testVisitNormalTable() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "normal_table";
+ createNormalTable(databaseName, tableName);
+ Path vfsPath = new Path(vfsRoot, databaseName + "/" + tableName);
+ // List normal table, directory schema should exist
+ FileStatus fileStatus = vfs.getFileStatus(vfsPath);
+ Assert.assertTrue(fileStatus.isDirectory());
+ Assert.assertEquals(vfsPath.toString(),
fileStatus.getPath().toString());
+ FileStatus[] fileStatuses = vfs.listStatus(vfsPath);
+ Assert.assertEquals(1, fileStatuses.length);
+ Assert.assertTrue(fileStatuses[0].isDirectory());
+ Assert.assertEquals(
+ new Path(vfsPath, "schema").toString(),
fileStatuses[0].getPath().toString());
+ }
+}
diff --git a/paimon-vfs/pom.xml b/paimon-vfs/pom.xml
new file mode 100644
index 0000000000..b2430d8cb0
--- /dev/null
+++ b/paimon-vfs/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-parent</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-vfs</artifactId>
+ <name>Paimon : VirtualFileSystem</name>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>paimon-vfs-common</module>
+ <module>paimon-vfs-hadoop</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-format</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pom.xml b/pom.xml
index 73d01c7118..57f9c6fb80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ under the License.
<module>paimon-filesystems</module>
<module>paimon-format</module>
<module>paimon-lance</module>
+ <module>paimon-vfs</module>
<module>paimon-bundle</module>
<module>paimon-hive</module>
<module>paimon-spark</module>