This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d6da6a808f [hotfix] Minor refactor for paged methods in RESTCatalog
d6da6a808f is described below
commit d6da6a808f7972131152d91b39c46ed45450c88d
Author: JingsongLi <[email protected]>
AuthorDate: Mon Mar 10 15:21:18 2025 +0800
[hotfix] Minor refactor for paged methods in RESTCatalog
---
.../java/org/apache/paimon/rest/RESTCatalog.java | 228 ++++++++-------------
1 file changed, 89 insertions(+), 139 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index fb80ff37cf..d28a6cd670 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogUtils;
@@ -106,6 +107,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -224,7 +226,7 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
GetDatabaseResponse.class,
restAuthFunction);
return new Database.DatabaseImpl(
- name, response.options(), response.comment().orElseGet(()
-> null));
+ name, response.options(), response.comment().orElse(null));
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(name);
} catch (ForbiddenException e) {
@@ -279,13 +281,12 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
try {
return listDataFromPageApi(
- queryParams -> {
- return client.get(
- resourcePaths.tables(databaseName),
- queryParams,
- ListTablesResponse.class,
- restAuthFunction);
- });
+ queryParams ->
+ client.get(
+ resourcePaths.tables(databaseName),
+ queryParams,
+ ListTablesResponse.class,
+ restAuthFunction));
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
@@ -293,26 +294,20 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
@Override
public PagedList<String> listTablesPaged(
- String databaseName, Integer maxResults, String pageToken)
+ String databaseName, @Nullable Integer maxResults, @Nullable
String pageToken)
throws DatabaseNotExistException {
- Map<String, String> queryParams = buildPagedQueryParams(maxResults,
pageToken);
try {
ListTablesResponse response =
client.get(
resourcePaths.tables(databaseName),
- queryParams,
+ buildPagedQueryParams(maxResults, pageToken),
ListTablesResponse.class,
restAuthFunction);
- if (Objects.nonNull(response.getTables())) {
- return new PagedList<>(response.getTables(),
response.getNextPageToken());
- } else {
- LOG.warn(
- "response of listTablesPaged for {} is null with
maxResults {} and pageToken {}",
- databaseName,
- maxResults,
- pageToken);
- return new PagedList<>(Collections.emptyList(), null);
+ List<String> tables = response.getTables();
+ if (tables == null) {
+ return new PagedList<>(emptyList(), null);
}
+ return new PagedList<>(tables, response.getNextPageToken());
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
@@ -320,46 +315,24 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
@Override
public PagedList<Table> listTableDetailsPaged(
- String databaseName, Integer maxResults, String pageToken)
+ String db, @Nullable Integer maxResults, @Nullable String
pageToken)
throws DatabaseNotExistException {
- Map<String, String> queryParams = buildPagedQueryParams(maxResults,
pageToken);
try {
ListTableDetailsResponse response =
client.get(
- resourcePaths.tableDetails(databaseName),
- queryParams,
+ resourcePaths.tableDetails(db),
+ buildPagedQueryParams(maxResults, pageToken),
ListTableDetailsResponse.class,
restAuthFunction);
- if (Objects.nonNull(response.getTableDetails())) {
- return new PagedList<>(
- response.getTableDetails().stream()
- .map(
- getTableResponse -> {
- try {
- return loadTableByResponse(
- getTableResponse,
databaseName);
- } catch (TableNotExistException e)
{
- LOG.error(
- "load table {}.{} by
response {} failed",
- databaseName,
-
getTableResponse.getName(),
- getTableResponse,
- e);
- throw new RuntimeException(e);
- }
- })
- .collect(Collectors.toList()),
- response.getNextPageToken());
- } else {
- LOG.warn(
- "response of listTableDetailsPaged for {} is null with
maxResults {} and pageToken {}",
- databaseName,
- maxResults,
- pageToken);
- return new PagedList<>(Collections.emptyList(), null);
+ List<GetTableResponse> tables = response.getTableDetails();
+ if (tables == null) {
+ return new PagedList<>(emptyList(), null);
}
+ return new PagedList<>(
+ tables.stream().map(t -> toTable(db,
t)).collect(Collectors.toList()),
+ response.getNextPageToken());
} catch (NoSuchResourceException e) {
- throw new DatabaseNotExistException(databaseName);
+ throw new DatabaseNotExistException(db);
}
}
@@ -445,10 +418,30 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
throw new TableNoPermissionException(identifier, e);
}
+ return toTableMetadata(response);
+ }
+
+ private TableMetadata toTableMetadata(GetTableResponse response) {
TableSchema schema = TableSchema.create(response.getSchemaId(),
response.getSchema());
return new TableMetadata(schema, response.isExternal(),
response.getId());
}
+ private Table toTable(String db, GetTableResponse response) {
+ Identifier identifier = Identifier.create(db, response.getName());
+ try {
+ return CatalogUtils.loadTable(
+ this,
+ identifier,
+ path -> fileIOForData(path, identifier),
+ this::fileIOFromOptions,
+ i -> toTableMetadata(response),
+ null,
+ null);
+ } catch (TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
@@ -710,7 +703,7 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
ListBranchesResponse.class,
restAuthFunction);
if (response.branches() == null) {
- return Collections.emptyList();
+ return emptyList();
}
return response.branches();
} catch (NoSuchResourceException e) {
@@ -722,22 +715,21 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
@Override
public PagedList<Partition> listPartitionsPaged(
- Identifier identifier, Integer maxResults, String pageToken)
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
throws TableNotExistException {
- Map<String, String> queryParams = buildPagedQueryParams(maxResults,
pageToken);
try {
ListPartitionsResponse response =
client.get(
resourcePaths.partitions(
identifier.getDatabaseName(),
identifier.getObjectName()),
- queryParams,
+ buildPagedQueryParams(maxResults, pageToken),
ListPartitionsResponse.class,
restAuthFunction);
- if (Objects.nonNull(response.getPartitions())) {
- return new PagedList<>(response.getPartitions(),
response.getNextPageToken());
- } else {
- return new PagedList<>(Collections.emptyList(), null);
+ List<Partition> partitions = response.getPartitions();
+ if (partitions == null) {
+ return new PagedList<>(emptyList(), null);
}
+ return new PagedList<>(partitions, response.getNextPageToken());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
@@ -757,14 +749,7 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
identifier.getDatabaseName(),
identifier.getObjectName()),
GetViewResponse.class,
restAuthFunction);
- ViewSchema schema = response.getSchema();
- return new ViewImpl(
- identifier,
- schema.fields(),
- schema.query(),
- schema.dialects(),
- schema.comment(),
- schema.options());
+ return toView(identifier.getDatabaseName(), response);
} catch (NoSuchResourceException e) {
throw new ViewNotExistException(identifier);
}
@@ -798,7 +783,6 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
CreateViewRequest request = new CreateViewRequest(identifier,
schema);
client.post(
resourcePaths.views(identifier.getDatabaseName()),
request, restAuthFunction);
-
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
} catch (AlreadyExistsException e) {
@@ -812,13 +796,12 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
public List<String> listViews(String databaseName) throws
DatabaseNotExistException {
try {
return listDataFromPageApi(
- queryParams -> {
- return client.get(
- resourcePaths.views(databaseName),
- queryParams,
- ListViewsResponse.class,
- restAuthFunction);
- });
+ queryParams ->
+ client.get(
+ resourcePaths.views(databaseName),
+ queryParams,
+ ListViewsResponse.class,
+ restAuthFunction));
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
@@ -826,26 +809,20 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
@Override
public PagedList<String> listViewsPaged(
- String databaseName, Integer maxResults, String pageToken)
+ String databaseName, @Nullable Integer maxResults, @Nullable
String pageToken)
throws DatabaseNotExistException {
- Map<String, String> queryParams = buildPagedQueryParams(maxResults,
pageToken);
try {
ListViewsResponse response =
client.get(
resourcePaths.views(databaseName),
- queryParams,
+ buildPagedQueryParams(maxResults, pageToken),
ListViewsResponse.class,
restAuthFunction);
- if (Objects.nonNull(response.getViews())) {
- return new PagedList<>(response.getViews(),
response.getNextPageToken());
- } else {
- LOG.warn(
- "response of listViewsPaged for {} is null with
maxResults {} and pageToken {}",
- databaseName,
- maxResults,
- pageToken);
- return new PagedList<>(Collections.emptyList(), null);
+ List<String> views = response.getViews();
+ if (views == null) {
+ return new PagedList<>(emptyList(), null);
}
+ return new PagedList<>(views, response.getNextPageToken());
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
@@ -853,46 +830,38 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
@Override
public PagedList<View> listViewDetailsPaged(
- String databaseName, Integer maxResults, String pageToken)
+ String db, @Nullable Integer maxResults, @Nullable String
pageToken)
throws DatabaseNotExistException {
- Map<String, String> queryParams = buildPagedQueryParams(maxResults,
pageToken);
try {
ListViewDetailsResponse response =
client.get(
- resourcePaths.viewDetails(databaseName),
- queryParams,
+ resourcePaths.viewDetails(db),
+ buildPagedQueryParams(maxResults, pageToken),
ListViewDetailsResponse.class,
restAuthFunction);
- if (Objects.nonNull(response.getViewDetails())) {
- return new PagedList<>(
- response.getViewDetails().stream()
- .map(
- viewResponse -> {
- ViewSchema schema =
viewResponse.getSchema();
- return new ViewImpl(
- Identifier.create(
- databaseName,
viewResponse.getName()),
- schema.fields(),
- schema.query(),
- schema.dialects(),
- schema.comment(),
- schema.options());
- })
- .collect(Collectors.toList()),
- response.getNextPageToken());
- } else {
- LOG.warn(
- "response of listViewDetailsPaged for {} is null with
maxResults {} and pageToken {}",
- databaseName,
- maxResults,
- pageToken);
- return new PagedList<>(Collections.emptyList(), null);
+ List<GetViewResponse> views = response.getViewDetails();
+ if (views == null) {
+ return new PagedList<>(emptyList(), null);
}
+ return new PagedList<>(
+ views.stream().map(v -> toView(db,
v)).collect(Collectors.toList()),
+ response.getNextPageToken());
} catch (NoSuchResourceException e) {
- throw new DatabaseNotExistException(databaseName);
+ throw new DatabaseNotExistException(db);
}
}
+ private ViewImpl toView(String db, GetViewResponse response) {
+ ViewSchema schema = response.getSchema();
+ return new ViewImpl(
+ Identifier.create(db, response.getName()),
+ schema.fields(),
+ schema.query(),
+ schema.dialects(),
+ schema.comment(),
+ schema.options());
+ }
+
@Override
public void renameView(Identifier fromView, Identifier toView, boolean
ignoreIfNotExists)
throws ViewNotExistException, ViewAlreadyExistException {
@@ -927,6 +896,7 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
}
}
+ @VisibleForTesting
Map<String, String> headers(RESTAuthParameter restAuthParameter) {
return restAuthFunction.apply(restAuthParameter);
}
@@ -996,28 +966,8 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
}
}
- private Table loadTableByResponse(GetTableResponse getTableResponse,
String databaseName)
- throws TableNotExistException {
- Identifier identifier = Identifier.create(databaseName,
getTableResponse.getName());
- TableMetadata.Loader tableMetaDataLoader =
- tableIdentifier -> {
- TableSchema schema =
- TableSchema.create(
- getTableResponse.getSchemaId(),
getTableResponse.getSchema());
- return new TableMetadata(
- schema, getTableResponse.isExternal(),
getTableResponse.getId());
- };
- return CatalogUtils.loadTable(
- this,
- identifier,
- path -> fileIOForData(path, identifier),
- this::fileIOFromOptions,
- tableMetaDataLoader,
- null,
- null);
- }
-
- private Map<String, String> buildPagedQueryParams(Integer maxResults,
String pageToken) {
+ private Map<String, String> buildPagedQueryParams(
+ @Nullable Integer maxResults, @Nullable String pageToken) {
Map<String, String> queryParams = Maps.newHashMap();
if (Objects.nonNull(maxResults) && maxResults > 0) {
queryParams.put(MAX_RESULTS, maxResults.toString());