This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch release/1.0.x
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/release/1.0.x by this push:
new a701f105c Fix Pagination for Catalog Federation (#1849)
a701f105c is described below
commit a701f105c5d44565ac0ea86db45edbcebdbed718
Author: Rulin Xing <[email protected]>
AuthorDate: Wed Jun 25 09:18:28 2025 -0700
Fix Pagination for Catalog Federation (#1849)
Details can be found in this issue:
https://github.com/apache/polaris/issues/1848
---
.../catalog/iceberg/CatalogHandlerUtils.java | 24 +-
.../catalog/iceberg/IcebergCatalogAdapter.java | 5 +-
.../catalog/iceberg/IcebergCatalogHandler.java | 9 +-
.../catalog/iceberg/IcebergCatalogAdapterTest.java | 245 +++++++++++++++++++++
.../org/apache/polaris/service/TestServices.java | 9 +-
5 files changed, 272 insertions(+), 20 deletions(-)
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
index ae879ea5f..480aafe7a 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.lang.reflect.Field;
@@ -164,12 +165,19 @@ public class CatalogHandlerUtils {
}
}
- private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken,
int pageSize) {
+ private <T> Pair<List<T>, String> paginate(
+ List<T> list, @Nullable String pageToken, @Nullable Integer pageSize) {
+ if (pageToken == null) {
+ return Pair.of(list, null);
+ }
+
int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 :
Integer.parseInt(pageToken);
if (pageStart >= list.size()) {
return Pair.of(Collections.emptyList(), null);
}
+ // if pageSize is null, return the rest of the list
+ pageSize = pageSize == null ? list.size() : pageSize;
int end = Math.min(pageStart + pageSize, list.size());
List<T> subList = list.subList(pageStart, end);
String nextPageToken = end >= list.size() ? null : String.valueOf(end);
@@ -189,7 +197,7 @@ public class CatalogHandlerUtils {
}
public ListNamespacesResponse listNamespaces(
- SupportsNamespaces catalog, Namespace parent, String pageToken, String
pageSize) {
+ SupportsNamespaces catalog, Namespace parent, String pageToken, Integer
pageSize) {
List<Namespace> results;
if (parent.isEmpty()) {
@@ -198,7 +206,7 @@ public class CatalogHandlerUtils {
results = catalog.listNamespaces(parent);
}
- Pair<List<Namespace>, String> page = paginate(results, pageToken,
Integer.parseInt(pageSize));
+ Pair<List<Namespace>, String> page = paginate(results, pageToken,
pageSize);
return ListNamespacesResponse.builder()
.addAll(page.first())
@@ -269,11 +277,10 @@ public class CatalogHandlerUtils {
}
public ListTablesResponse listTables(
- Catalog catalog, Namespace namespace, String pageToken, String pageSize)
{
+ Catalog catalog, Namespace namespace, String pageToken, Integer
pageSize) {
List<TableIdentifier> results = catalog.listTables(namespace);
- Pair<List<TableIdentifier>, String> page =
- paginate(results, pageToken, Integer.parseInt(pageSize));
+ Pair<List<TableIdentifier>, String> page = paginate(results, pageToken,
pageSize);
return
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
}
@@ -725,11 +732,10 @@ public class CatalogHandlerUtils {
}
public ListTablesResponse listViews(
- ViewCatalog catalog, Namespace namespace, String pageToken, String
pageSize) {
+ ViewCatalog catalog, Namespace namespace, String pageToken, Integer
pageSize) {
List<TableIdentifier> results = catalog.listViews(namespace);
- Pair<List<TableIdentifier>, String> page =
- paginate(results, pageToken, Integer.parseInt(pageSize));
+ Pair<List<TableIdentifier>, String> page = paginate(results, pageToken,
pageSize);
return
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index c28fd491a..7218da0f3 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -21,6 +21,7 @@ package org.apache.polaris.service.catalog.iceberg;
import static
org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
import static
org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation.validateIcebergProperties;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -189,8 +190,8 @@ public class IcebergCatalogAdapter
}
}
- private IcebergCatalogHandler newHandlerWrapper(
- SecurityContext securityContext, String catalogName) {
+ @VisibleForTesting
+ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext,
String catalogName) {
validatePrincipal(securityContext);
return new IcebergCatalogHandler(
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index c06e9d98d..b39e24822 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -190,8 +190,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
.nextPageToken(results.pageToken.toTokenString())
.build();
} else {
- return catalogHandlerUtils.listNamespaces(
- namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
+ return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent,
pageToken, pageSize);
}
}
@@ -351,8 +350,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
.nextPageToken(results.pageToken.toTokenString())
.build();
} else {
- return catalogHandlerUtils.listTables(
- baseCatalog, namespace, pageToken, String.valueOf(pageSize));
+ return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken,
pageSize);
}
}
@@ -1017,8 +1015,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
.nextPageToken(results.pageToken.toTokenString())
.build();
} else if (baseCatalog instanceof ViewCatalog viewCatalog) {
- return catalogHandlerUtils.listViews(
- viewCatalog, namespace, pageToken, String.valueOf(pageSize));
+ return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken,
pageSize);
} else {
throw new BadRequestException(
"Unsupported operation: listViews with baseCatalog type: %s",
diff --git
a/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
new file mode 100644
index 000000000..f8f948a66
--- /dev/null
+++
b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.polaris.core.admin.model.AuthenticationParameters;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.ExternalCatalog;
+import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.TestServices;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Strings;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+public class IcebergCatalogAdapterTest {
+
+ private static final String FEDERATED_CATALOG_NAME =
"polaris-federated-catalog";
+
+ private TestServices testServices;
+ private IcebergCatalogAdapter catalogAdapter;
+
+ @BeforeEach
+ public void setUp() {
+ // Set up test services with catalog federation enabled
+ testServices =
+ TestServices.builder().config(Map.of("ENABLE_CATALOG_FEDERATION",
"true")).build();
+ catalogAdapter = Mockito.spy(testServices.catalogAdapter());
+
+ // Prepare storage and connection configs for a federated Iceberg REST
catalog
+ String storageLocation = "s3://my-bucket/path/to/data";
+ AwsStorageConfigInfo storageConfig =
+ AwsStorageConfigInfo.builder()
+ .setRoleArn("arn:aws:iam::012345678901:role/polaris-user-role")
+ .setExternalId("externalId")
+ .setUserArn("aws::a:user:arn")
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setAllowedLocations(List.of(storageLocation,
"s3://externally-owned-bucket"))
+ .build();
+
+ AuthenticationParameters authParams =
+ BearerAuthenticationParameters.builder()
+
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.BEARER)
+ .setBearerToken("xxx")
+ .build();
+
+ IcebergRestConnectionConfigInfo connectionConfig =
+ IcebergRestConnectionConfigInfo.builder()
+
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
+ .setAuthenticationParameters(authParams)
+ .setUri("http://localhost:8080/api/v1/catalogs")
+ .setRemoteCatalogName("remote-catalog")
+ .build();
+
+ // Register the catalog in the test environment
+ testServices
+ .catalogsApi()
+ .createCatalog(
+ new CreateCatalogRequest(
+ ExternalCatalog.builder()
+ .setName(FEDERATED_CATALOG_NAME)
+ .setProperties(
+
CatalogProperties.builder().setDefaultBaseLocation(storageLocation).build())
+ .setConnectionConfigInfo(connectionConfig)
+ .setStorageConfigInfo(storageConfig)
+ .build()),
+ testServices.realmContext(),
+ testServices.securityContext());
+ }
+
+ @ParameterizedTest(name = "[{index}] initialPageToken={0}, pageSize={1}")
+ @MethodSource("paginationTestCases")
+ void testPaginationForNonIcebergCatalog(String initialPageToken, Integer
pageSize)
+ throws IOException {
+
+ try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) {
+ // Initialize and replace the default handler with one backed by
in-memory catalog
+ inMemoryCatalog.initialize("inMemory", Map.of());
+ mockCatalogAdapter(inMemoryCatalog);
+
+ // Set up 10 entities in the catalog: 10 namespaces, 10 tables, 10 views
+ int entityCount = 10;
+ for (int i = 0; i < entityCount; ++i) {
+ inMemoryCatalog.createNamespace(Namespace.of("ns" + i));
+ inMemoryCatalog.createTable(TableIdentifier.of("ns0", "table" + i),
new Schema());
+ inMemoryCatalog
+ .buildView(TableIdentifier.of("ns0", "view" + i))
+ .withSchema(new Schema())
+ .withDefaultNamespace(Namespace.of("ns0"))
+ .withQuery("a", "SELECT * FROM ns0.table" + i)
+ .create();
+ }
+
+ // Determine starting index for pagination based on the initial page
token
+ int pageStart =
+ Strings.isNullOrEmpty(initialPageToken) ? 0 :
Integer.parseInt(initialPageToken);
+ int remain = entityCount - pageStart;
+
+ // Initial tokens for pagination
+ String listNamespacePageToken = initialPageToken;
+ String listTablesPageToken = initialPageToken;
+ String listViewsPageToken = initialPageToken;
+
+ // Simulate page-by-page fetching until all entities are consumed
+ while (remain > 0) {
+ int expectedSize =
+ (pageSize != null && initialPageToken != null) ? Math.min(remain,
pageSize) : remain;
+
+ // Verify namespaces pagination
+ ListNamespacesResponse namespacesResponse =
+ (ListNamespacesResponse)
+ catalogAdapter
+ .listNamespaces(
+ FEDERATED_CATALOG_NAME,
+ listNamespacePageToken,
+ pageSize,
+ null,
+ testServices.realmContext(),
+ testServices.securityContext())
+ .getEntity();
+
Assertions.assertThat(namespacesResponse.namespaces()).hasSize(expectedSize);
+ listNamespacePageToken = namespacesResponse.nextPageToken();
+
+ // Verify tables pagination
+ ListTablesResponse tablesResponse =
+ (ListTablesResponse)
+ catalogAdapter
+ .listTables(
+ FEDERATED_CATALOG_NAME,
+ "ns0",
+ listTablesPageToken,
+ pageSize,
+ testServices.realmContext(),
+ testServices.securityContext())
+ .getEntity();
+
Assertions.assertThat(tablesResponse.identifiers()).hasSize(expectedSize);
+ listTablesPageToken = tablesResponse.nextPageToken();
+
+ // Verify views pagination
+ ListTablesResponse viewsResponse =
+ (ListTablesResponse)
+ catalogAdapter
+ .listViews(
+ FEDERATED_CATALOG_NAME,
+ "ns0",
+ listViewsPageToken,
+ pageSize,
+ testServices.realmContext(),
+ testServices.securityContext())
+ .getEntity();
+
Assertions.assertThat(viewsResponse.identifiers()).hasSize(expectedSize);
+ listViewsPageToken = viewsResponse.nextPageToken();
+
+ remain -= expectedSize;
+ }
+ }
+ }
+
+ private void mockCatalogAdapter(org.apache.iceberg.catalog.Catalog catalog) {
+ // Override handler creation to inject in-memory catalog and suppress
actual close()
+ Mockito.doAnswer(
+ invocation -> {
+ IcebergCatalogHandler realHandler =
+ (IcebergCatalogHandler) invocation.callRealMethod();
+ IcebergCatalogHandler wrappedHandler = Mockito.spy(realHandler);
+
+ // Override initializeCatalog to inject test catalog using
reflection
+ Mockito.doAnswer(
+ innerInvocation -> {
+ for (String fieldName :
+ List.of("baseCatalog", "namespaceCatalog",
"viewCatalog")) {
+ Field field =
IcebergCatalogHandler.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(wrappedHandler, catalog);
+ }
+ return null;
+ })
+ .when(wrappedHandler)
+ .initializeCatalog();
+
+ // Prevent catalog from being closed during test lifecycle
+ Mockito.doNothing().when(wrappedHandler).close();
+
+ return wrappedHandler;
+ })
+ .when(catalogAdapter)
+ .newHandlerWrapper(Mockito.any(), Mockito.any());
+ }
+
+ private static Stream<Arguments> paginationTestCases() {
+ return Stream.of(
+ Arguments.of(null, null),
+ Arguments.of(null, 1),
+ Arguments.of(null, 3),
+ Arguments.of(null, 5),
+ Arguments.of(null, 10),
+ Arguments.of(null, 20),
+ Arguments.of("", null),
+ Arguments.of("", 1),
+ Arguments.of("", 3),
+ Arguments.of("", 5),
+ Arguments.of("", 10),
+ Arguments.of("", 20),
+ Arguments.of("5", null),
+ Arguments.of("5", 1),
+ Arguments.of("5", 3),
+ Arguments.of("5", 5),
+ Arguments.of("5", 10));
+ }
+}
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index d3622126f..d2e08b4c5 100644
---
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -73,6 +73,7 @@ public record TestServices(
PolarisCatalogsApi catalogsApi,
IcebergRestCatalogApi restApi,
IcebergRestConfigurationApi restConfigurationApi,
+ IcebergCatalogAdapter catalogAdapter,
PolarisConfigurationStore configurationStore,
PolarisDiagnostics polarisDiagnostics,
RealmEntityManagerFactory entityManagerFactory,
@@ -197,7 +198,7 @@ public record TestServices(
CatalogHandlerUtils catalogHandlerUtils =
new CatalogHandlerUtils(callContext.getRealmContext(),
configurationStore);
- IcebergCatalogAdapter service =
+ IcebergCatalogAdapter catalogService =
new IcebergCatalogAdapter(
realmContext,
callContext,
@@ -210,8 +211,9 @@ public record TestServices(
reservedProperties,
catalogHandlerUtils);
- IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
- IcebergRestConfigurationApi restConfigurationApi = new
IcebergRestConfigurationApi(service);
+ IcebergRestCatalogApi restApi = new
IcebergRestCatalogApi(catalogService);
+ IcebergRestConfigurationApi restConfigurationApi =
+ new IcebergRestConfigurationApi(catalogService);
CreatePrincipalResult createdPrincipal =
metaStoreManager.createPrincipal(
@@ -262,6 +264,7 @@ public record TestServices(
catalogsApi,
restApi,
restConfigurationApi,
+ catalogService,
configurationStore,
polarisDiagnostics,
realmEntityManagerFactory,