This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9605b4531f [rest] Refactory RESTTokenFileIO to let it reused in vfs
(#5939)
9605b4531f is described below
commit 9605b4531fff1537bff00259430c61b75c94df3d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 23 13:01:16 2025 +0800
[rest] Refactory RESTTokenFileIO to let it reused in vfs (#5939)
---
.../java/org/apache/paimon/rest/RESTToken.java | 0
.../org/apache/paimon/rest/RESTTokenFileIO.java | 43 +++------
.../java/org/apache/paimon/rest/RESTCatalog.java | 2 +-
.../java/org/apache/paimon/vfs/VFSDataToken.java | 64 -------------
.../java/org/apache/paimon/vfs/VFSOperations.java | 103 ++-------------------
.../paimon/vfs/hadoop/PaimonVirtualFileSystem.java | 5 +-
6 files changed, 24 insertions(+), 193 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java
similarity index 100%
rename from paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java
rename to paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
similarity index 84%
rename from
paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
rename to
paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index cbfb28d6e3..c34e62123d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -18,7 +18,6 @@
package org.apache.paimon.rest;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
@@ -51,7 +50,7 @@ import static
org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
/** A {@link FileIO} to support getting token from REST Server. */
public class RESTTokenFileIO implements FileIO {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
ConfigOptions.key("data-token.enabled")
@@ -74,25 +73,22 @@ public class RESTTokenFileIO implements FileIO {
private static final Logger LOG =
LoggerFactory.getLogger(RESTTokenFileIO.class);
- private final RESTCatalogLoader catalogLoader;
+ private final CatalogContext catalogContext;
private final Identifier identifier;
private final Path path;
- // catalog instance before serialization, it will become null after
serialization, then we
- // should create catalog from catalog loader
- private final transient RESTCatalog catalogInstance;
+ // Api instance before serialization, it will become null after
serialization, then we should
+ // create RESTApi from catalogContext
+ private transient volatile RESTApi apiInstance;
// the latest token from REST Server, serializable in order to avoid
loading token from the REST
// Server again after serialization
private volatile RESTToken token;
public RESTTokenFileIO(
- RESTCatalogLoader catalogLoader,
- RESTCatalog catalogInstance,
- Identifier identifier,
- Path path) {
- this.catalogLoader = catalogLoader;
- this.catalogInstance = catalogInstance;
+ CatalogContext catalogContext, RESTApi apiInstance, Identifier
identifier, Path path) {
+ this.catalogContext = catalogContext;
+ this.apiInstance = apiInstance;
this.identifier = identifier;
this.path = path;
}
@@ -165,12 +161,13 @@ public class RESTTokenFileIO implements FileIO {
return fileIO;
}
- CatalogContext context = catalogLoader.context();
- Options options = context.options();
+ Options options = catalogContext.options();
// the original options are not overwritten
options = new Options(RESTUtil.merge(token.token(),
options.toMap()));
options.set(FILE_IO_ALLOW_CACHE, false);
- context = CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
+ CatalogContext context =
+ CatalogContext.create(
+ options, catalogContext.preferIO(),
catalogContext.fallbackIO());
try {
fileIO = FileIO.get(path, context);
} catch (IOException e) {
@@ -199,20 +196,10 @@ public class RESTTokenFileIO implements FileIO {
private void refreshToken() {
LOG.info("begin refresh data token for identifier [{}]", identifier);
- GetTableTokenResponse response;
- if (catalogInstance != null) {
- try {
- response = catalogInstance.loadTableToken(identifier);
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
- } else {
- try (RESTCatalog catalog = catalogLoader.load()) {
- response = catalog.loadTableToken(identifier);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ if (apiInstance == null) {
+ apiInstance = new RESTApi(catalogContext.options(), false);
}
+ GetTableTokenResponse response =
apiInstance.loadTableToken(identifier);
LOG.info(
"end refresh data token for identifier [{}] expiresAtMillis
[{}]",
identifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6f574a1ea5..fcf2d0525f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -971,7 +971,7 @@ public class RESTCatalog implements Catalog {
private FileIO fileIOForData(Path path, Identifier identifier) {
return dataTokenEnabled
- ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
+ ? new RESTTokenFileIO(context, api, identifier, path)
: fileIOFromOptions(path);
}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
deleted file mode 100644
index 730ee591af..0000000000
---
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.vfs;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-
-/** Data token. */
-public class VFSDataToken implements Serializable {
- private static final long serialVersionUID = 1L;
- private final Map<String, String> token;
- private final long expireAtMillis;
- @Nullable private Integer hash;
-
- public VFSDataToken(Map<String, String> token, long expireAtMillis) {
- this.token = token;
- this.expireAtMillis = expireAtMillis;
- }
-
- public Map<String, String> token() {
- return this.token;
- }
-
- public long expireAtMillis() {
- return this.expireAtMillis;
- }
-
- public boolean equals(Object o) {
- if (o != null && this.getClass() == o.getClass()) {
- VFSDataToken token1 = (VFSDataToken) o;
- return this.expireAtMillis == token1.expireAtMillis
- && Objects.equals(this.token, token1.token);
- } else {
- return false;
- }
- }
-
- public int hashCode() {
- if (this.hash == null) {
- this.hash = Objects.hash(new Object[] {this.token,
this.expireAtMillis});
- }
-
- return this.hash;
- }
-}
diff --git
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
index ad9cd09122..6ca09dc739 100644
---
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
+++
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -24,7 +24,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTApi;
-import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.RESTTokenFileIO;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -32,14 +32,7 @@ import
org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
-import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.ThreadUtils;
-
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,41 +42,22 @@ import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.OBJECT_TABLE;
-import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
-import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
/** Wrap over RESTCatalog to provide basic operations for virtual path. */
public class VFSOperations {
+
private static final Logger LOG =
LoggerFactory.getLogger(VFSOperations.class);
private final RESTApi api;
private final CatalogContext context;
- // table id -> fileIO
- private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE =
- Caffeine.newBuilder()
- .expireAfterAccess(30, TimeUnit.MINUTES)
- .maximumSize(1000)
- .removalListener(
- (ignored, value, cause) ->
IOUtils.closeQuietly((FileIO) value))
- .scheduler(
- Scheduler.forScheduledExecutorService(
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.newDaemonThreadFactory(
-
"rest-token-file-io-scheduler"))))
- .build();
-
- private static final Cache<String, VFSDataToken> TOKEN_CACHE =
- Caffeine.newBuilder().expireAfterAccess(30,
TimeUnit.MINUTES).maximumSize(1000).build();
-
- public VFSOperations(CatalogContext context) {
- this.context = context;
- this.api = new RESTApi(context.options());
+ public VFSOperations(Options options) {
+ this.api = new RESTApi(options);
+ // Get the configured options which has been merged from REST Server
+ this.context = CatalogContext.create(api.options());
}
public VFSIdentifier getVFSIdentifier(String virtualPath) throws
IOException {
@@ -116,9 +90,7 @@ public class VFSOperations {
}
// Get real path
StringBuilder realPath = new StringBuilder(table.getPath());
- boolean isTableRoot = true;
if (parts.length > 2) {
- isTableRoot = false;
if (!table.getPath().endsWith("/")) {
realPath.append("/");
}
@@ -129,9 +101,8 @@ public class VFSOperations {
}
}
}
- // Get REST token
- FileIO fileIO = getFileIO(new Identifier(databaseName, tableName),
table);
+ FileIO fileIO = new RESTTokenFileIO(context, api, identifier, new
Path(table.getPath()));
if (parts.length == 2) {
return new VFSTableRootIdentifier(
table, realPath.toString(), fileIO, databaseName,
tableName);
@@ -254,66 +225,6 @@ public class VFSOperations {
}
}
- private FileIO getFileIO(Identifier identifier, GetTableResponse table)
throws IOException {
- VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId());
- if (shouldRefresh(token)) {
- synchronized (TOKEN_CACHE) {
- token = TOKEN_CACHE.getIfPresent(table.getId());
- if (shouldRefresh(token)) {
- token = refreshToken(identifier);
- TOKEN_CACHE.put(table.getId(), token);
- }
- }
- }
-
- FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
- if (fileIO != null) {
- return fileIO;
- }
-
- synchronized (FILE_IO_CACHE) {
- fileIO = FILE_IO_CACHE.getIfPresent(token);
- if (fileIO != null) {
- return fileIO;
- }
-
- Options options = context.options();
- // the original options are not overwritten
- options = new Options(RESTUtil.merge(token.token(),
options.toMap()));
- options.set(FILE_IO_ALLOW_CACHE, false);
- CatalogContext fileIOContext = CatalogContext.create(options);
- fileIO = FileIO.get(new Path(table.getPath()), fileIOContext);
- FILE_IO_CACHE.put(token, fileIO);
- return fileIO;
- }
- }
-
- private boolean shouldRefresh(VFSDataToken token) {
- return token == null
- || token.expireAtMillis() - System.currentTimeMillis()
- < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
- }
-
- private VFSDataToken refreshToken(Identifier identifier) throws
IOException {
- LOG.info("begin refresh data token for identifier [{}]", identifier);
- GetTableTokenResponse response;
- try {
- response = api.loadTableToken(identifier);
- } catch (NoSuchResourceException e) {
- throw new FileNotFoundException("Table " + identifier + " not
found");
- } catch (ForbiddenException e) {
- throw new IOException("No permission to access table " +
identifier);
- }
-
- LOG.info(
- "end refresh data token for identifier [{}] expiresAtMillis
[{}]",
- identifier,
- response.getExpiresAtMillis());
-
- VFSDataToken token = new VFSDataToken(response.getToken(),
response.getExpiresAtMillis());
- return token;
- }
-
private GetTableResponse loadTableMetadata(Identifier identifier) throws
IOException {
// if the table is system table, we need to load table metadata from
the system table's data
// table
diff --git
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
index 3eb1849b23..1c0e8ceb39 100644
---
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
+++
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -18,7 +18,6 @@
package org.apache.paimon.vfs.hadoop;
-import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -73,9 +72,7 @@ public class PaimonVirtualFileSystem extends FileSystem {
Options options =
PaimonVirtualFileSystemConfiguration.convertToCatalogOptions(conf);
// pvfs://catalog_name/database_name/table_name/file, so uri authority
is catalog name
options.set(CatalogOptions.WAREHOUSE, uri.getAuthority());
-
- CatalogContext catalogContext = CatalogContext.create(options);
- vfsOperations = new VFSOperations(catalogContext);
+ vfsOperations = new VFSOperations(options);
}
private String getVirtualPath(Path path) {