This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 44064cbf4 Interface changes for pagination (#1528)
44064cbf4 is described below
commit 44064cbf4e4ab7aa29eeafdfae8f41510add8784
Author: Eric Maynard <[email protected]>
AuthorDate: Fri May 9 10:39:02 2025 -0700
Interface changes for pagination (#1528)
* add missing apis
* more tests, fixes
* clean up drop
* autolint
* changes per review
* revert iceberg messages to comply with oss tests
* another revert
* more iceberg catalog changes
* autolint
* dependency issues
* more wiring
* continuing rebase
* remaining issues are related to task loading
* re-add tests
* debugging
* fix failing tests
* fix another test
* changes per review
* autolint
* some fixes
* stable
* updates for new persistence
* fix
* continuing work
* more reverts
* continue reverts
* more reverts
* yank tests
* autolint
* test reverts
* try to support limit without real page tokens
* autolint
* Stable
* change comment
* autolint
* remove catalog config for now
* changes per review
* more tweaks
* simplify types per review
* Stable, about to refactor more
* re-stable
* polish
* autolint
* more changes per review
* stable
---
.../PolarisEclipseLinkMetaStoreSessionImpl.java | 47 +++++----
.../impl/eclipselink/PolarisEclipseLinkStore.java | 7 +-
.../relational/jdbc/JdbcBasePersistenceImpl.java | 45 +++++----
.../polaris/core/config/FeatureConfiguration.java | 7 ++
.../AtomicOperationMetaStoreManager.java | 47 +++++----
.../polaris/core/persistence/BasePersistence.java | 20 ++--
.../core/persistence/PolarisMetaStoreManager.java | 9 +-
.../TransactionWorkspaceMetaStoreManager.java | 6 +-
.../persistence/dao/entity/EntitiesResult.java | 25 ++++-
.../persistence/dao/entity/ListEntitiesResult.java | 27 +++++-
.../core/persistence/pagination/DonePageToken.java | 40 ++++++++
.../core/persistence/pagination/HasPageSize.java | 27 ++++++
.../persistence/pagination/LimitPageToken.java | 52 ++++++++++
.../polaris/core/persistence/pagination/Page.java | 42 ++++++++
.../core/persistence/pagination/PageToken.java | 99 +++++++++++++++++++
.../pagination/ReadEverythingPageToken.java | 42 ++++++++
.../AbstractTransactionalPersistence.java | 26 +++--
.../TransactionalMetaStoreManagerImpl.java | 66 +++++++------
.../transactional/TransactionalPersistence.java | 18 ++--
.../TreeMapTransactionalPersistenceImpl.java | 54 ++++++-----
.../BasePolarisMetaStoreManagerTest.java | 21 ++--
.../persistence/PolarisTestMetaStoreManager.java | 26 +++--
.../quarkus/catalog/IcebergCatalogTest.java | 10 +-
.../quarkus/task/TableCleanupTaskHandlerTest.java | 13 +--
.../polaris/service/admin/PolarisAdminService.java | 13 ++-
.../catalog/generic/GenericTableCatalog.java | 4 +-
.../service/catalog/iceberg/IcebergCatalog.java | 106 ++++++++++++++++-----
.../catalog/iceberg/IcebergCatalogAdapter.java | 13 ++-
.../catalog/iceberg/IcebergCatalogHandler.java | 53 +++++++++++
.../service/catalog/policy/PolicyCatalog.java | 4 +-
.../service/catalog/io/FileIOFactoryTest.java | 3 +-
.../persistence/pagination/PageTokenTest.java | 50 ++++++++++
32 files changed, 823 insertions(+), 199 deletions(-)
diff --git
a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
index e19b0ef72..b6bd23762 100644
---
a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
+++
b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
@@ -37,6 +37,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
@@ -52,6 +53,9 @@ import
org.apache.polaris.core.exceptions.AlreadyExistsException;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
+import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import
org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -419,29 +423,30 @@ public class PolarisEclipseLinkMetaStoreSessionImpl
extends AbstractTransactiona
/** {@inheritDoc} */
@Override
- public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType) {
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken) {
return this.listEntitiesInCurrentTxn(
- callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
+ callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(),
pageToken);
}
@Override
- public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter) {
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken) {
// full range scan under the parent for that type
return this.listEntitiesInCurrentTxn(
callCtx,
catalogId,
parentId,
entityType,
- Integer.MAX_VALUE,
entityFilter,
entity ->
new EntityNameLookupRecord(
@@ -450,27 +455,33 @@ public class PolarisEclipseLinkMetaStoreSessionImpl
extends AbstractTransactiona
entity.getParentId(),
entity.getName(),
entity.getTypeCode(),
- entity.getSubTypeCode()));
+ entity.getSubTypeCode()),
+ pageToken);
}
@Override
- public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
+ public @Nonnull <T> Page<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer) {
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ @Nonnull PageToken pageToken) {
// full range scan under the parent for that type
- return this.store
- .lookupFullEntitiesActive(localSession.get(), catalogId, parentId,
entityType)
- .stream()
- .map(ModelEntity::toEntity)
- .filter(entityFilter)
- .limit(limit)
- .map(transformer)
- .collect(Collectors.toList());
+ Stream<PolarisBaseEntity> data =
+ this.store
+ .lookupFullEntitiesActive(
+ localSession.get(), catalogId, parentId, entityType, pageToken)
+ .stream()
+ .map(ModelEntity::toEntity)
+ .filter(entityFilter);
+
+ if (pageToken instanceof HasPageSize hasPageSize) {
+ data = data.limit(hasPageSize.getPageSize());
+ }
+
+ return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
}
/** {@inheritDoc} */
diff --git
a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
index 0988dcb7f..4e992e07f 100644
---
a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
+++
b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
@@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.jpa.models.ModelEntity;
import org.apache.polaris.jpa.models.ModelEntityActive;
@@ -282,7 +283,11 @@ public class PolarisEclipseLinkStore {
}
List<ModelEntity> lookupFullEntitiesActive(
- EntityManager session, long catalogId, long parentId, @Nonnull
PolarisEntityType entityType) {
+ EntityManager session,
+ long catalogId,
+ long parentId,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();
diff --git
a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
index 38448934f..cd6a0b6c0 100644
---
a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
+++
b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
@@ -49,6 +49,9 @@ import
org.apache.polaris.core.persistence.IntegrationPersistence;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
+import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyType;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -352,49 +355,51 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
@Nonnull
@Override
- public List<EntityNameLookupRecord> listEntities(
+ public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType) {
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
- Integer.MAX_VALUE,
entity -> true,
- EntityNameLookupRecord::new);
+ EntityNameLookupRecord::new,
+ pageToken);
}
@Nonnull
@Override
- public List<EntityNameLookupRecord> listEntities(
+ public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter) {
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
- Integer.MAX_VALUE,
entityFilter,
- EntityNameLookupRecord::new);
+ EntityNameLookupRecord::new,
+ pageToken);
}
@Nonnull
@Override
- public <T> List<T> listEntities(
+ public <T> Page<T> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer) {
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ @Nonnull PageToken pageToken) {
Map<String, Object> params =
Map.of(
"catalog_id",
@@ -415,15 +420,17 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
query,
new ModelEntity(),
stream -> {
- stream
- .map(ModelEntity::toEntity)
- .filter(entityFilter)
- .limit(limit)
- .forEach(results::add);
+ var data = stream.map(ModelEntity::toEntity).filter(entityFilter);
+ if (pageToken instanceof HasPageSize hasPageSize) {
+ data = data.limit(hasPageSize.getPageSize());
+ }
+ data.forEach(results::add);
});
- return results == null
- ? Collections.emptyList()
- :
results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
+ List<T> resultsOrEmpty =
+ results == null
+ ? Collections.emptyList()
+ :
results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
+ return Page.fromItems(resultsOrEmpty);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s",
e.getMessage()), e);
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index ee663dbb1..26437a8f6 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -202,6 +202,13 @@ public class FeatureConfiguration<T> extends
PolarisConfiguration<T> {
.defaultValue(2)
.buildFeatureConfiguration();
+ public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED =
+ PolarisConfiguration.<Boolean>builder()
+ .key("LIST_PAGINATION_ENABLED")
+ .description("If set to true, pagination for APIs like listTables is
enabled.")
+ .defaultValue(false)
+ .buildFeatureConfiguration();
+
public static final FeatureConfiguration<Boolean> ENABLE_GENERIC_TABLES =
PolarisConfiguration.<Boolean>builder()
.key("ENABLE_GENERIC_TABLES")
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index b2eb8fcfa..2a32fb6f9 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -62,6 +62,8 @@ import
org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyMappingUtil;
@@ -687,7 +689,8 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
- @Nonnull PolarisEntitySubType entitySubType) {
+ @Nonnull PolarisEntitySubType entitySubType,
+ @Nonnull PageToken pageToken) {
// get meta store we should be using
BasePersistence ms = callCtx.getMetaStore();
@@ -699,15 +702,16 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
catalogPath == null || catalogPath.size() == 0
? 0l
: catalogPath.get(catalogPath.size() - 1).getId();
- List<EntityNameLookupRecord> toreturnList =
- ms.listEntities(callCtx, catalogId, parentId, entityType);
+ Page<EntityNameLookupRecord> resultPage =
+ ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken);
// prune the returned list with only entities matching the entity subtype
if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
- toreturnList =
- toreturnList.stream()
- .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
- .collect(Collectors.toList());
+ resultPage =
+ pageToken.buildNextPage(
+ resultPage.items.stream()
+ .filter(rec -> rec.getSubTypeCode() ==
entitySubType.getCode())
+ .collect(Collectors.toList()));
}
// TODO: Use post-validation to enforce consistent view against
catalogPath. In the
@@ -717,7 +721,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// in-flight request (the cache-based resolution follows a different path
entirely).
// done
- return new ListEntitiesResult(toreturnList);
+ return ListEntitiesResult.fromPage(resultPage);
}
/** {@inheritDoc} */
@@ -1176,13 +1180,14 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// get the list of catalog roles, at most 2
List<PolarisBaseEntity> catalogRoles =
ms.listEntities(
- callCtx,
- catalogId,
- catalogId,
- PolarisEntityType.CATALOG_ROLE,
- 2,
- entity -> true,
- Function.identity());
+ callCtx,
+ catalogId,
+ catalogId,
+ PolarisEntityType.CATALOG_ROLE,
+ entity -> true,
+ Function.identity(),
+ PageToken.fromLimit(2))
+ .items;
// if we have 2, we cannot drop the catalog. If only one left, better be
the admin role
if (catalogRoles.size() > 1) {
@@ -1488,17 +1493,16 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
@Override
public @Nonnull EntitiesResult loadTasks(
- @Nonnull PolarisCallContext callCtx, String executorId, int limit) {
+ @Nonnull PolarisCallContext callCtx, String executorId, PageToken
pageToken) {
BasePersistence ms = callCtx.getMetaStore();
// find all available tasks
- List<PolarisBaseEntity> availableTasks =
+ Page<PolarisBaseEntity> availableTasks =
ms.listEntities(
callCtx,
PolarisEntityConstants.getRootEntityId(),
PolarisEntityConstants.getRootEntityId(),
PolarisEntityType.TASK,
- limit,
entity -> {
PolarisObjectMapperUtil.TaskExecutionState taskState =
PolarisObjectMapperUtil.parseTaskState(entity);
@@ -1513,11 +1517,12 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
|| taskState.executor == null
|| callCtx.getClock().millis() -
taskState.lastAttemptStartTime > taskAgeTimeout;
},
- Function.identity());
+ Function.identity(),
+ pageToken);
List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
final AtomicInteger failedLeaseCount = new AtomicInteger(0);
- availableTasks.forEach(
+ availableTasks.items.forEach(
task -> {
PolarisBaseEntity updatedTask = new PolarisBaseEntity(task);
Map<String, String> properties =
@@ -1554,7 +1559,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
throw new RetryOnConcurrencyException(
"Failed to lease any of %s tasks due to concurrent leases",
failedLeaseCount.get());
}
- return new EntitiesResult(loadedTasks);
+ return EntitiesResult.fromPage(Page.fromItems(loadedTasks));
}
/** {@inheritDoc} */
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
index 75b18fb45..9d6f6ba35 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
@@ -31,6 +31,8 @@ import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolicyMappingPersistence;
/**
@@ -270,14 +272,16 @@ public interface BasePersistence extends
PolicyMappingPersistence {
* @param catalogId catalog id for that entity, NULL_ID if the entity is
top-level
* @param parentId id of the parent, can be the special 0 value representing
the root entity
* @param entityType type of entities to list
+ * @param pageToken the token to start listing after
* @return the list of entities for the specified list operation
*/
@Nonnull
- List<EntityNameLookupRecord> listEntities(
+ Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType);
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken);
/**
* List entities where some predicate returns true
@@ -288,15 +292,17 @@ public interface BasePersistence extends
PolicyMappingPersistence {
* @param entityType type of entities to list
* @param entityFilter the filter to be applied to each entity. Only
entities where the predicate
* returns true are returned in the list
+ * @param pageToken the token to start listing after
* @return the list of entities for which the predicate returns true
*/
@Nonnull
- List<EntityNameLookupRecord> listEntities(
+ Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter);
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken);
/**
* List entities where some predicate returns true and transform the
entities with a function
@@ -313,14 +319,14 @@ public interface BasePersistence extends
PolicyMappingPersistence {
* @return the list of entities for which the predicate returns true
*/
@Nonnull
- <T> List<T> listEntities(
+ <T> Page<T> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer);
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ PageToken pageToken);
/**
* Lookup the current entityGrantRecordsVersion for the specified entity.
That version is changed
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
index da2ab521e..2a20ad5c1 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
@@ -42,6 +42,7 @@ import
org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult;
import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingManager;
import org.apache.polaris.core.storage.PolarisCredentialVendor;
@@ -120,7 +121,8 @@ public interface PolarisMetaStoreManager
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
- @Nonnull PolarisEntitySubType entitySubType);
+ @Nonnull PolarisEntitySubType entitySubType,
+ @Nonnull PageToken pageToken);
/**
* Generate a new unique id that can be used by the Polaris client when it
needs to create a new
@@ -300,11 +302,12 @@ public interface PolarisMetaStoreManager
*
* @param callCtx call context
* @param executorId executor id
- * @param limit limit
+ * @param pageToken page token to start after
* @return list of tasks to be completed
*/
@Nonnull
- EntitiesResult loadTasks(@Nonnull PolarisCallContext callCtx, String
executorId, int limit);
+ EntitiesResult loadTasks(
+ @Nonnull PolarisCallContext callCtx, String executorId, PageToken
pageToken);
/**
* Load change tracking information for a set of entities in one single shot
and return for each
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 9b5a6b6db..b7ba47e83 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -50,6 +50,7 @@ import
org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyType;
@@ -118,7 +119,8 @@ public class TransactionWorkspaceMetaStoreManager
implements PolarisMetaStoreMan
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
- @Nonnull PolarisEntitySubType entitySubType) {
+ @Nonnull PolarisEntitySubType entitySubType,
+ @Nonnull PageToken pageToken) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace",
"listEntities");
return null;
}
@@ -320,7 +322,7 @@ public class TransactionWorkspaceMetaStoreManager
implements PolarisMetaStoreMan
@Override
public EntitiesResult loadTasks(
- @Nonnull PolarisCallContext callCtx, String executorId, int limit) {
+ @Nonnull PolarisCallContext callCtx, String executorId, PageToken
pageToken) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace",
"loadTasks");
return null;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
index 70d9edcf5..e27b69680 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
@@ -23,13 +23,21 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
+import java.util.Optional;
import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
/** a set of returned entities result */
public class EntitiesResult extends BaseResult {
// null if not success. Else the list of entities being returned
private final List<PolarisBaseEntity> entities;
+ private final Optional<PageToken> pageTokenOpt;
+
+ public static EntitiesResult fromPage(Page<PolarisBaseEntity> page) {
+ return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken));
+ }
/**
* Constructor for an error
@@ -40,6 +48,11 @@ public class EntitiesResult extends BaseResult {
public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String
extraInformation) {
super(errorStatus, extraInformation);
this.entities = null;
+ this.pageTokenOpt = Optional.empty();
+ }
+
+ public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) {
+ this(entities, Optional.empty());
}
/**
@@ -47,21 +60,29 @@ public class EntitiesResult extends BaseResult {
*
* @param entities list of entities being returned, implies success
*/
- public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) {
+ public EntitiesResult(
+ @Nonnull List<PolarisBaseEntity> entities, @Nonnull Optional<PageToken>
pageTokenOpt) {
super(ReturnStatus.SUCCESS);
this.entities = entities;
+ this.pageTokenOpt = pageTokenOpt;
}
@JsonCreator
private EntitiesResult(
@JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus,
@JsonProperty("extraInformation") String extraInformation,
- @JsonProperty("entities") List<PolarisBaseEntity> entities) {
+ @JsonProperty("entities") List<PolarisBaseEntity> entities,
+ @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) {
super(returnStatus, extraInformation);
this.entities = entities;
+ this.pageTokenOpt = pageTokenOpt;
}
public List<PolarisBaseEntity> getEntities() {
return entities;
}
+
+ public Optional<PageToken> getPageToken() {
+ return pageTokenOpt;
+ }
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
index bc51f4dab..10669e899 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
@@ -23,13 +23,22 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
+import java.util.Optional;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
/** the return the result for a list entities call */
public class ListEntitiesResult extends BaseResult {
// null if not success. Else the list of entities being returned
private final List<EntityNameLookupRecord> entities;
+ private final Optional<PageToken> pageTokenOpt;
+
+ /** Create a {@link ListEntitiesResult} from a {@link Page} */
+ public static ListEntitiesResult fromPage(Page<EntityNameLookupRecord> page)
{
+ return new ListEntitiesResult(page.items,
Optional.ofNullable(page.pageToken));
+ }
/**
* Constructor for an error
@@ -37,9 +46,13 @@ public class ListEntitiesResult extends BaseResult {
* @param errorCode error code, cannot be SUCCESS
* @param extraInformation extra information
*/
- public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String
extraInformation) {
+ public ListEntitiesResult(
+ @Nonnull ReturnStatus errorCode,
+ @Nullable String extraInformation,
+ @Nonnull Optional<PageToken> pageTokenOpt) {
super(errorCode, extraInformation);
this.entities = null;
+ this.pageTokenOpt = pageTokenOpt;
}
/**
@@ -47,21 +60,29 @@ public class ListEntitiesResult extends BaseResult {
*
* @param entities list of entities being returned, implies success
*/
- public ListEntitiesResult(@Nonnull List<EntityNameLookupRecord> entities) {
+ public ListEntitiesResult(
+ @Nonnull List<EntityNameLookupRecord> entities, @Nonnull
Optional<PageToken> pageTokenOpt) {
super(ReturnStatus.SUCCESS);
this.entities = entities;
+ this.pageTokenOpt = pageTokenOpt;
}
@JsonCreator
private ListEntitiesResult(
@JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus,
@JsonProperty("extraInformation") String extraInformation,
- @JsonProperty("entities") List<EntityNameLookupRecord> entities) {
+ @JsonProperty("entities") List<EntityNameLookupRecord> entities,
+ @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) {
super(returnStatus, extraInformation);
this.entities = entities;
+ this.pageTokenOpt = pageTokenOpt;
}
public List<EntityNameLookupRecord> getEntities() {
return entities;
}
+
+ public Optional<PageToken> getPageToken() {
+ return pageTokenOpt;
+ }
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
new file mode 100644
index 000000000..d46ea7b02
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+import java.util.List;
+
+/**
+ * A {@link PageToken} string that represents the lack of a page token.
Returns `null` in
+ * `toTokenString`, which the client will interpret as there being no more
data available.
+ */
+public class DonePageToken extends PageToken {
+
+ public DonePageToken() {}
+
+ @Override
+ public String toTokenString() {
+ return null;
+ }
+
+ @Override
+ protected PageToken updated(List<?> newData) {
+ throw new IllegalStateException("DonePageToken.updated is invalid");
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
new file mode 100644
index 000000000..c6b216fcd
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+/**
+ * A light interface for {@link PageToken} implementations to express that
they have a page size
+ * that should be respected
+ */
+public interface HasPageSize {
+ int getPageSize();
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
new file mode 100644
index 000000000..18586446c
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+import java.util.List;
+
+/**
+ * A {@link PageToken} implementation that has a page size, but no start
offset. This can be used to
+ * represent a `limit`. When updated, it returns {@link DonePageToken}. As
such it should never be
+ * user-facing and doesn't truly paginate.
+ */
+public class LimitPageToken extends PageToken implements HasPageSize {
+
+ public static final String PREFIX = "limit";
+
+ private final int pageSize;
+
+ public LimitPageToken(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public String toTokenString() {
+ return String.format("%s/%d", PREFIX, pageSize);
+ }
+
+ @Override
+ protected PageToken updated(List<?> newData) {
+ return new DonePageToken();
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
new file mode 100644
index 000000000..18287f85c
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+import java.util.List;
+
+/**
+ * An immutable page of items plus their paging cursor. The {@link PageToken}
here can be used to
+ * continue the listing operation that generated the `items`.
+ */
+public class Page<T> {
+ public final PageToken pageToken;
+ public final List<T> items;
+
+ public Page(PageToken pageToken, List<T> items) {
+ this.pageToken = pageToken;
+ this.items = items;
+ }
+
+ /**
+ * Used to wrap a {@link List<T>} of items into a {@link Page <T>} when
there are no more pages
+ */
+ public static <T> Page<T> fromItems(List<T> items) {
+ return new Page<>(new DonePageToken(), items);
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
new file mode 100644
index 000000000..2e335ccd4
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents a page token that can be used by operations like `listTables`.
Clients that specify a
+ * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the
response, the content of
+ * which is a serialized PageToken.
+ *
+ * <p>By providing that in the next query's `pageToken`, the client can resume
listing where they
+ * left off. If the client provides a `pageToken` or `pageSize` but
`next-page-token` is null in the
+ * response, that means there is no more data to read.
+ */
+public abstract class PageToken {
+
+ /** Build a new PageToken that reads everything */
+ public static PageToken readEverything() {
+ return build(null, null);
+ }
+
+ /** Build a new PageToken from an input String, without a specified page
size */
+ public static PageToken fromString(String token) {
+ return build(token, null);
+ }
+
+ /** Build a new PageToken from a limit */
+ public static PageToken fromLimit(Integer pageSize) {
+ return build(null, pageSize);
+ }
+
+ /** Build a {@link PageToken} from the input string and page size */
+ public static PageToken build(String token, Integer pageSize) {
+ if (token == null || token.isEmpty()) {
+ if (pageSize != null) {
+ return new LimitPageToken(pageSize);
+ } else {
+ return new ReadEverythingPageToken();
+ }
+ } else {
+ // TODO implement, split out by the token's prefix
+ throw new IllegalArgumentException("Unrecognized page token: " + token);
+ }
+ }
+
+ /** Serialize a {@link PageToken} into a string */
+ public abstract String toTokenString();
+
+ /**
+ * Builds a new page token to reflect new data that's been read. If the
amount of data read is
+ * less than the pageSize, this will return a {@link DonePageToken}
+ */
+ protected abstract PageToken updated(List<?> newData);
+
+ /**
+ * Builds a {@link Page <T>} from a {@link List<T>}. The {@link PageToken}
attached to the new
+ * {@link Page <T>} is the same as the result of calling {@link
#updated(List)} on this {@link
+ * PageToken}.
+ */
+ public final <T> Page<T> buildNextPage(List<T> data) {
+ return new Page<T>(updated(data), data);
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof PageToken) {
+ return Objects.equals(this.toTokenString(), ((PageToken)
o).toTokenString());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public final int hashCode() {
+ if (toTokenString() == null) {
+ return 0;
+ } else {
+ return toTokenString().hashCode();
+ }
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
new file mode 100644
index 000000000..c8476c351
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.pagination;
+
+import java.util.List;
+
+/**
+ * A {@link PageToken} implementation for readers who want to read everything.
The behavior when
+ * using this token should be the same as when reading without a token.
+ */
+public class ReadEverythingPageToken extends PageToken {
+
+ public static String PREFIX = "read-everything";
+
+ public ReadEverythingPageToken() {}
+
+ @Override
+ public String toTokenString() {
+ return PREFIX;
+ }
+
+ @Override
+ protected PageToken updated(List<?> newData) {
+ return new DonePageToken();
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
index e949b33fe..e63ea6fed 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
@@ -37,6 +37,8 @@ import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.EntityAlreadyExistsException;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyType;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -352,46 +354,50 @@ public abstract class AbstractTransactionalPersistence
implements TransactionalP
/** {@inheritDoc} */
@Override
@Nonnull
- public List<EntityNameLookupRecord> listEntities(
+ public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType) {
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken) {
return runInReadTransaction(
- callCtx, () -> this.listEntitiesInCurrentTxn(callCtx, catalogId,
parentId, entityType));
+ callCtx,
+ () -> this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId,
entityType, pageToken));
}
/** {@inheritDoc} */
@Override
@Nonnull
- public List<EntityNameLookupRecord> listEntities(
+ public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter) {
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken) {
return runInReadTransaction(
callCtx,
() ->
- this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId,
entityType, entityFilter));
+ this.listEntitiesInCurrentTxn(
+ callCtx, catalogId, parentId, entityType, entityFilter,
pageToken));
}
/** {@inheritDoc} */
@Override
@Nonnull
- public <T> List<T> listEntities(
+ public <T> Page<T> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer) {
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ @Nonnull PageToken pageToken) {
return runInReadTransaction(
callCtx,
() ->
this.listEntitiesInCurrentTxn(
- callCtx, catalogId, parentId, entityType, limit, entityFilter,
transformer));
+ callCtx, catalogId, parentId, entityType, entityFilter,
transformer, pageToken));
}
/** {@inheritDoc} */
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 6eb48c12e..62f526a6d 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -63,6 +64,8 @@ import
org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyMappingUtil;
@@ -677,37 +680,41 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
}
/**
- * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType,
PolarisEntitySubType)}
+ * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType,
PolarisEntitySubType,
+ * PageToken)}
*/
private @Nonnull ListEntitiesResult listEntities(
@Nonnull PolarisCallContext callCtx,
@Nonnull TransactionalPersistence ms,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
- @Nonnull PolarisEntitySubType entitySubType) {
+ @Nonnull PolarisEntitySubType entitySubType,
+ @Nonnull PageToken pageToken) {
// first resolve again the catalogPath to that entity
PolarisEntityResolver resolver = new PolarisEntityResolver(callCtx, ms,
catalogPath);
// return if we failed to resolve
if (resolver.isFailure()) {
- return new
ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED,
null);
+ return new ListEntitiesResult(
+ BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null,
Optional.empty());
}
// return list of active entities
- List<EntityNameLookupRecord> toreturnList =
+ Page<EntityNameLookupRecord> resultPage =
ms.listEntitiesInCurrentTxn(
- callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(),
entityType);
+ callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(),
entityType, pageToken);
// prune the returned list with only entities matching the entity subtype
if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
- toreturnList =
- toreturnList.stream()
- .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
- .collect(Collectors.toList());
+ resultPage =
+ pageToken.buildNextPage(
+ resultPage.items.stream()
+ .filter(rec -> rec.getSubTypeCode() ==
entitySubType.getCode())
+ .collect(Collectors.toList()));
}
// done
- return new ListEntitiesResult(toreturnList);
+ return ListEntitiesResult.fromPage(resultPage);
}
/** {@inheritDoc} */
@@ -716,13 +723,15 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
- @Nonnull PolarisEntitySubType entitySubType) {
+ @Nonnull PolarisEntitySubType entitySubType,
+ @Nonnull PageToken pageToken) {
// get meta store we should be using
TransactionalPersistence ms = ((TransactionalPersistence)
callCtx.getMetaStore());
// run operation in a read transaction
return ms.runInReadTransaction(
- callCtx, () -> listEntities(callCtx, ms, catalogPath, entityType,
entitySubType));
+ callCtx,
+ () -> listEntities(callCtx, ms, catalogPath, entityType,
entitySubType, pageToken));
}
/** {@link #createPrincipal(PolarisCallContext, PolarisBaseEntity)} */
@@ -1359,13 +1368,14 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
// get the list of catalog roles, at most 2
List<PolarisBaseEntity> catalogRoles =
ms.listEntitiesInCurrentTxn(
- callCtx,
- catalogId,
- catalogId,
- PolarisEntityType.CATALOG_ROLE,
- 2,
- entity -> true,
- Function.identity());
+ callCtx,
+ catalogId,
+ catalogId,
+ PolarisEntityType.CATALOG_ROLE,
+ entity -> true,
+ Function.identity(),
+ PageToken.fromLimit(2))
+ .items;
// if we have 2, we cannot drop the catalog. If only one left, better be
the admin role
if (catalogRoles.size() > 1) {
@@ -1919,21 +1929,20 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
() -> this.loadEntity(callCtx, ms, entityCatalogId, entityId,
entityType.getCode()));
}
- /** Refer to {@link #loadTasks(PolarisCallContext, String, int)} */
+ /** Refer to {@link #loadTasks(PolarisCallContext, String, PageToken)} */
private @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx,
@Nonnull TransactionalPersistence ms,
String executorId,
- int limit) {
+ PageToken pageToken) {
// find all available tasks
- List<PolarisBaseEntity> availableTasks =
+ Page<PolarisBaseEntity> availableTasks =
ms.listEntitiesInCurrentTxn(
callCtx,
PolarisEntityConstants.getRootEntityId(),
PolarisEntityConstants.getRootEntityId(),
PolarisEntityType.TASK,
- limit,
entity -> {
PolarisObjectMapperUtil.TaskExecutionState taskState =
PolarisObjectMapperUtil.parseTaskState(entity);
@@ -1948,10 +1957,11 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
|| taskState.executor == null
|| callCtx.getClock().millis() -
taskState.lastAttemptStartTime > taskAgeTimeout;
},
- Function.identity());
+ Function.identity(),
+ pageToken);
List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
- availableTasks.forEach(
+ availableTasks.items.forEach(
task -> {
// Make a copy to avoid mutating someone else's reference.
// TODO: Refactor into immutable/Builder pattern.
@@ -1982,14 +1992,14 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
result.getReturnStatus(), result.getExtraInformation());
}
});
- return new EntitiesResult(loadedTasks);
+ return EntitiesResult.fromPage(Page.fromItems(loadedTasks));
}
@Override
public @Nonnull EntitiesResult loadTasks(
- @Nonnull PolarisCallContext callCtx, String executorId, int limit) {
+ @Nonnull PolarisCallContext callCtx, String executorId, PageToken
pageToken) {
TransactionalPersistence ms = ((TransactionalPersistence)
callCtx.getMetaStore());
- return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms,
executorId, limit));
+ return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms,
executorId, pageToken));
}
/** {@inheritDoc} */
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
index 2057991db..1c58334d5 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
@@ -36,6 +36,8 @@ import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.IntegrationPersistence;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.TransactionalPolicyMappingPersistence;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
@@ -201,31 +203,33 @@ public interface TransactionalPersistence
/** See {@link
org.apache.polaris.core.persistence.BasePersistence#listEntities} */
@Nonnull
- List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType);
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken);
/** See {@link
org.apache.polaris.core.persistence.BasePersistence#listEntities} */
@Nonnull
- List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter);
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken);
/** See {@link
org.apache.polaris.core.persistence.BasePersistence#listEntities} */
@Nonnull
- <T> List<T> listEntitiesInCurrentTxn(
+ <T> Page<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer);
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ @Nonnull PageToken pageToken);
/**
* See {@link
org.apache.polaris.core.persistence.BasePersistence#lookupEntityGrantRecordsVersion}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
index 39ce364d3..304ac0ce9 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
@@ -26,6 +26,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisBaseEntity;
@@ -38,6 +39,9 @@ import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
+import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
@@ -301,29 +305,30 @@ public class TreeMapTransactionalPersistenceImpl extends
AbstractTransactionalPe
/** {@inheritDoc} */
@Override
- public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
- @Nonnull PolarisEntityType entityType) {
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull PageToken pageToken) {
return this.listEntitiesInCurrentTxn(
- callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
+ callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(),
pageToken);
}
@Override
- public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
+ public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- @Nonnull Predicate<PolarisBaseEntity> entityFilter) {
+ @Nonnull Predicate<PolarisBaseEntity> entityFilter,
+ @Nonnull PageToken pageToken) {
// full range scan under the parent for that type
return this.listEntitiesInCurrentTxn(
callCtx,
catalogId,
parentId,
entityType,
- Integer.MAX_VALUE,
entityFilter,
entity ->
new EntityNameLookupRecord(
@@ -332,31 +337,36 @@ public class TreeMapTransactionalPersistenceImpl extends
AbstractTransactionalPe
entity.getParentId(),
entity.getName(),
entity.getTypeCode(),
- entity.getSubTypeCode()));
+ entity.getSubTypeCode()),
+ pageToken);
}
@Override
- public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
+ public @Nonnull <T> Page<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
- int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
- @Nonnull Function<PolarisBaseEntity, T> transformer) {
+ @Nonnull Function<PolarisBaseEntity, T> transformer,
+ @Nonnull PageToken pageToken) {
// full range scan under the parent for that type
- return this.store
- .getSliceEntitiesActive()
- .readRange(this.store.buildPrefixKeyComposite(catalogId, parentId,
entityType.getCode()))
- .stream()
- .map(
- nameRecord ->
- this.lookupEntityInCurrentTxn(
- callCtx, catalogId, nameRecord.getId(),
entityType.getCode()))
- .filter(entityFilter)
- .limit(limit)
- .map(transformer)
- .collect(Collectors.toList());
+ Stream<PolarisBaseEntity> data =
+ this.store
+ .getSliceEntitiesActive()
+ .readRange(
+ this.store.buildPrefixKeyComposite(catalogId, parentId,
entityType.getCode()))
+ .stream()
+ .map(
+ nameRecord ->
+ this.lookupEntityInCurrentTxn(
+ callCtx, catalogId, nameRecord.getId(),
entityType.getCode()))
+ .filter(entityFilter);
+ if (pageToken instanceof HasPageSize) {
+ data = data.limit(((HasPageSize) pageToken).getPageSize());
+ }
+
+ return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
}
/** {@inheritDoc} */
diff --git
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index f58313712..0f834bc76 100644
---
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
@@ -128,7 +129,8 @@ public abstract class BasePolarisMetaStoreManagerTest {
polarisTestMetaStoreManager.polarisCallContext,
null,
PolarisEntityType.TASK,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
Assertions.assertThat(listedEntities)
.isNotNull()
@@ -307,7 +309,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
PolarisMetaStoreManager metaStoreManager =
polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx =
polarisTestMetaStoreManager.polarisCallContext;
List<PolarisBaseEntity> taskList =
- metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(5)).getEntities();
Assertions.assertThat(taskList)
.isNotNull()
.isNotEmpty()
@@ -327,7 +329,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
// grab a second round of tasks. Assert that none of the original 5 are in
the list
List<PolarisBaseEntity> newTaskList =
- metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(5)).getEntities();
Assertions.assertThat(newTaskList)
.isNotNull()
.isNotEmpty()
@@ -341,7 +343,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
// only 10 tasks are unassigned. Requesting 20, we should only receive
those 10
List<PolarisBaseEntity> lastTen =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(20)).getEntities();
Assertions.assertThat(lastTen)
.isNotNull()
@@ -355,7 +357,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
.collect(Collectors.toSet());
List<PolarisBaseEntity> emtpyList =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(20)).getEntities();
Assertions.assertThat(emtpyList).isNotNull().isEmpty();
@@ -363,7 +365,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
// all the tasks are unassigned. Fetch them all
List<PolarisBaseEntity> allTasks =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(20)).getEntities();
Assertions.assertThat(allTasks)
.isNotNull()
@@ -378,7 +380,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
timeSource.add(Duration.ofMinutes(10));
List<PolarisBaseEntity> finalList =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId,
PageToken.fromLimit(20)).getEntities();
Assertions.assertThat(finalList).isNotNull().isEmpty();
}
@@ -406,7 +408,10 @@ public abstract class BasePolarisMetaStoreManagerTest {
do {
retry = false;
try {
- taskList = metaStoreManager.loadTasks(callCtx,
executorId, 5).getEntities();
+ taskList =
+ metaStoreManager
+ .loadTasks(callCtx, executorId,
PageToken.fromLimit(5))
+ .getEntities();
taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add);
} catch (RetryOnConcurrencyException e) {
retry = true;
diff --git
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
index da72308a0..8cddecae3 100644
---
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
+++
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
@@ -51,6 +51,7 @@ import
org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult;
import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyType;
@@ -766,7 +767,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
path,
PolarisEntityType.NAMESPACE,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
Assertions.assertThat(children).isNotNull();
if (children.isEmpty() && entity.getType() ==
PolarisEntityType.NAMESPACE) {
@@ -776,7 +778,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
path,
PolarisEntityType.TABLE_LIKE,
- PolarisEntitySubType.ANY_SUBTYPE)
+ PolarisEntitySubType.ANY_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
Assertions.assertThat(children).isNotNull();
} else if (children.isEmpty()) {
@@ -786,7 +789,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
path,
PolarisEntityType.CATALOG_ROLE,
- PolarisEntitySubType.ANY_SUBTYPE)
+ PolarisEntitySubType.ANY_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
Assertions.assertThat(children).isNotNull();
// if only one left, it can be dropped.
@@ -1555,7 +1559,12 @@ public class PolarisTestMetaStoreManager {
// list the entities under the specified path
List<EntityNameLookupRecord> result =
polarisMetaStoreManager
- .listEntities(this.polarisCallContext, path, entityType,
entitySubType)
+ .listEntities(
+ this.polarisCallContext,
+ path,
+ entityType,
+ entitySubType,
+ PageToken.readEverything())
.getEntities();
Assertions.assertThat(result).isNotNull();
@@ -1872,7 +1881,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
null,
PolarisEntityType.PRINCIPAL,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
// ensure not null, one element only
@@ -1898,7 +1908,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
null,
PolarisEntityType.PRINCIPAL_ROLE,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
// ensure not null, one element only
@@ -2636,7 +2647,8 @@ public class PolarisTestMetaStoreManager {
this.polarisCallContext,
null,
PolarisEntityType.PRINCIPAL,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities();
// ensure not null, one element only
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index d81edd1d4..5837f8822 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -97,6 +97,7 @@ import
org.apache.polaris.core.persistence.cache.InMemoryEntityCache;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
@@ -171,6 +172,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
"true",
"polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
"[\"FILE\"]",
+ "polaris.features.defaults.\"LIST_PAGINATION_ENABLED\"",
+ "true",
"polaris.event-listener.type",
"test");
}
@@ -1536,7 +1539,9 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
.as("Table should not exist after drop")
.rejects(TABLE);
List<PolarisBaseEntity> tasks =
- metaStoreManager.loadTasks(polarisContext, "testExecutor",
1).getEntities();
+ metaStoreManager
+ .loadTasks(polarisContext, "testExecutor", PageToken.fromLimit(1))
+ .getEntities();
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
EnumMap<StorageAccessProperty, String> credentials =
@@ -1745,7 +1750,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
TaskEntity taskEntity =
TaskEntity.of(
metaStoreManager
- .loadTasks(callContext.getPolarisCallContext(),
"testExecutor", 1)
+ .loadTasks(
+ callContext.getPolarisCallContext(), "testExecutor",
PageToken.fromLimit(1))
.getEntities()
.getFirst());
Map<String, String> properties = taskEntity.getInternalPropertiesAsMap();
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
index 1c1f1441e..2bb53b40c 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
@@ -54,6 +54,7 @@ import
org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.task.BatchFileCleanupTaskHandler;
@@ -153,7 +154,7 @@ class TableCleanupTaskHandlerTest {
assertThat(
metaStoreManagerFactory
.getOrCreateMetaStoreManager(realmContext)
- .loadTasks(callContext.getPolarisCallContext(), "test", 2)
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(2))
.getEntities())
.hasSize(2)
.satisfiesExactlyInAnyOrder(
@@ -233,7 +234,7 @@ class TableCleanupTaskHandlerTest {
assertThat(
metaStoreManagerFactory
.getOrCreateMetaStoreManager(realmContext)
- .loadTasks(callContext.getPolarisCallContext(), "test", 5)
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(5))
.getEntities())
.hasSize(2);
}
@@ -294,10 +295,10 @@ class TableCleanupTaskHandlerTest {
assertThat(
metaStoreManagerFactory
.getOrCreateMetaStoreManager(realmContext)
- .loadTasks(callContext.getPolarisCallContext(), "test", 5)
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(5))
.getEntities())
.hasSize(4)
- .satisfiesExactly(
+ .satisfiesExactlyInAnyOrder(
taskEntity ->
assertThat(taskEntity)
.returns(PolarisEntityType.TASK.getCode(),
PolarisBaseEntity::getTypeCode)
@@ -414,7 +415,7 @@ class TableCleanupTaskHandlerTest {
List<PolarisBaseEntity> entities =
metaStoreManagerFactory
.getOrCreateMetaStoreManager(realmContext)
- .loadTasks(callContext.getPolarisCallContext(), "test", 5)
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(5))
.getEntities();
List<PolarisBaseEntity> manifestCleanupTasks =
@@ -573,7 +574,7 @@ class TableCleanupTaskHandlerTest {
List<PolarisBaseEntity> entities =
metaStoreManagerFactory
.getOrCreateMetaStoreManager(callContext.getRealmContext())
- .loadTasks(callContext.getPolarisCallContext(), "test", 6)
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(6))
.getEntities();
List<PolarisBaseEntity> manifestCleanupTasks =
diff --git
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
index b814d902f..ef6a754e9 100644
---
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
+++
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
@@ -95,6 +95,7 @@ import
org.apache.polaris.core.persistence.dao.entity.CreatePrincipalResult;
import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
@@ -885,7 +886,8 @@ public class PolarisAdminService {
getCurrentPolarisContext(),
null,
PolarisEntityType.CATALOG,
- PolarisEntitySubType.ANY_SUBTYPE)
+ PolarisEntitySubType.ANY_SUBTYPE,
+ PageToken.readEverything())
.getEntities()
.stream()
.map(
@@ -1051,7 +1053,8 @@ public class PolarisAdminService {
getCurrentPolarisContext(),
null,
PolarisEntityType.PRINCIPAL,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities()
.stream()
.map(
@@ -1160,7 +1163,8 @@ public class PolarisAdminService {
getCurrentPolarisContext(),
null,
PolarisEntityType.PRINCIPAL_ROLE,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities()
.stream()
.map(
@@ -1288,7 +1292,8 @@ public class PolarisAdminService {
getCurrentPolarisContext(),
PolarisEntity.toCoreList(List.of(catalogEntity)),
PolarisEntityType.CATALOG_ROLE,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities()
.stream()
.map(
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java
index b2fb31f67..79adeaee8 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java
@@ -37,6 +37,7 @@ import
org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,7 +176,8 @@ public class GenericTableCatalog {
this.callContext.getPolarisCallContext(),
PolarisEntity.toCoreList(catalogPath),
PolarisEntityType.TABLE_LIKE,
- PolarisEntitySubType.GENERIC_TABLE)
+ PolarisEntitySubType.GENERIC_TABLE,
+ PageToken.readEverything())
.getEntities());
return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath,
entities);
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index c66def885..7c02a6154 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -109,6 +109,8 @@ import
org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
@@ -500,12 +502,20 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
+ return listTables(namespace, PageToken.readEverything()).items;
+ }
+
+ public Page<TableIdentifier> listTables(Namespace namespace, String
pageToken, Integer pageSize) {
+ return listTables(namespace, buildPageToken(pageToken, pageSize));
+ }
+
+ private Page<TableIdentifier> listTables(Namespace namespace, PageToken
pageToken) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException(
"Cannot list tables for namespace. Namespace does not exist: '%s'",
namespace);
}
- return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace);
+ return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace,
pageToken);
}
@Override
@@ -815,22 +825,36 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<Namespace> listNamespaces(Namespace namespace) throws
NoSuchNamespaceException {
+ return listNamespaces(namespace, PageToken.readEverything()).items;
+ }
+
+ public Page<Namespace> listNamespaces(Namespace namespace, String pageToken,
Integer pageSize) {
+ return listNamespaces(namespace, buildPageToken(pageToken, pageSize));
+ }
+
+ private Page<Namespace> listNamespaces(Namespace namespace, PageToken
pageToken)
+ throws NoSuchNamespaceException {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getResolvedPath(namespace);
if (resolvedEntities == null) {
throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
}
List<PolarisEntity> catalogPath = resolvedEntities.getRawFullPath();
+ ListEntitiesResult listResult =
+ getMetaStoreManager()
+ .listEntities(
+ getCurrentPolarisContext(),
+ PolarisEntity.toCoreList(catalogPath),
+ PolarisEntityType.NAMESPACE,
+ PolarisEntitySubType.NULL_SUBTYPE,
+ pageToken);
List<PolarisEntity.NameAndId> entities =
- PolarisEntity.toNameAndIdList(
- getMetaStoreManager()
- .listEntities(
- getCurrentPolarisContext(),
- PolarisEntity.toCoreList(catalogPath),
- PolarisEntityType.NAMESPACE,
- PolarisEntitySubType.NULL_SUBTYPE)
- .getEntities());
- return PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities);
+ PolarisEntity.toNameAndIdList(listResult.getEntities());
+ List<Namespace> namespaces =
PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities);
+ return listResult
+ .getPageToken()
+ .map(token -> new Page<>(token, namespaces))
+ .orElseGet(() -> Page.fromItems(namespaces));
}
@Override
@@ -842,12 +866,20 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<TableIdentifier> listViews(Namespace namespace) {
+ return listViews(namespace, PageToken.readEverything()).items;
+ }
+
+ public Page<TableIdentifier> listViews(Namespace namespace, String
pageToken, Integer pageSize) {
+ return listViews(namespace, buildPageToken(pageToken, pageSize));
+ }
+
+ private Page<TableIdentifier> listViews(Namespace namespace, PageToken
pageToken) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException(
"Cannot list views for namespace. Namespace does not exist: '%s'",
namespace);
}
- return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace);
+ return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace,
pageToken);
}
@Override
@@ -1074,7 +1106,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
callContext.getPolarisCallContext(),
parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()),
PolarisEntityType.NAMESPACE,
- PolarisEntitySubType.ANY_SUBTYPE);
+ PolarisEntitySubType.ANY_SUBTYPE,
+ PageToken.readEverything());
if (!siblingNamespacesResult.isSuccess()) {
throw new IllegalStateException(
"Unable to resolve siblings entities to validate location - could
not list namespaces");
@@ -1099,7 +1132,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
.map(PolarisEntity::toCore)
.collect(Collectors.toList()),
PolarisEntityType.TABLE_LIKE,
- PolarisEntitySubType.ANY_SUBTYPE);
+ PolarisEntitySubType.ANY_SUBTYPE,
+ PageToken.readEverything());
if (!siblingTablesResult.isSuccess()) {
throw new IllegalStateException(
"Unable to resolve siblings entities to validate
location - could not list tables");
@@ -2458,7 +2492,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
- private List<TableIdentifier> listTableLike(PolarisEntitySubType subType,
Namespace namespace) {
+ private Page<TableIdentifier> listTableLike(
+ PolarisEntitySubType subType, Namespace namespace, PageToken pageToken) {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getResolvedPath(namespace);
if (resolvedEntities == null) {
// Illegal state because the namespace should've already been in the
static resolution set.
@@ -2467,16 +2502,23 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
List<PolarisEntity> catalogPath = resolvedEntities.getRawFullPath();
+ ListEntitiesResult listResult =
+ getMetaStoreManager()
+ .listEntities(
+ getCurrentPolarisContext(),
+ PolarisEntity.toCoreList(catalogPath),
+ PolarisEntityType.TABLE_LIKE,
+ subType,
+ pageToken);
List<PolarisEntity.NameAndId> entities =
- PolarisEntity.toNameAndIdList(
- getMetaStoreManager()
- .listEntities(
- getCurrentPolarisContext(),
- PolarisEntity.toCoreList(catalogPath),
- PolarisEntityType.TABLE_LIKE,
- subType)
- .getEntities());
- return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath,
entities);
+ PolarisEntity.toNameAndIdList(listResult.getEntities());
+ List<TableIdentifier> identifiers =
+ PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath,
entities);
+
+ return listResult
+ .getPageToken()
+ .map(token -> new Page<>(token, identifiers))
+ .orElseGet(() -> Page.fromItems(identifiers));
}
/**
@@ -2524,4 +2566,22 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
.getConfiguration(
callContext.getPolarisCallContext(),
FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES);
}
+
+ /** Build a {@link PageToken} from a string and page size. */
+ private PageToken buildPageToken(@Nullable String tokenString, @Nullable
Integer pageSize) {
+
+ boolean paginationEnabled =
+ callContext
+ .getPolarisCallContext()
+ .getConfigurationStore()
+ .getConfiguration(
+ callContext.getPolarisCallContext(),
+ catalogEntity,
+ FeatureConfiguration.LIST_PAGINATION_ENABLED);
+ if (!paginationEnabled) {
+ return PageToken.readEverything();
+ } else {
+ return PageToken.build(tokenString, pageSize);
+ }
+ }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 5baffa395..38026c0fa 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -220,7 +220,10 @@ public class IcebergCatalogAdapter
securityContext,
prefix,
catalog ->
-
Response.ok(catalog.listNamespaces(namespaceOptional.orElse(Namespace.of()))).build());
+ Response.ok(
+ catalog.listNamespaces(
+ namespaceOptional.orElse(Namespace.of()), pageToken,
pageSize))
+ .build());
}
@Override
@@ -356,7 +359,9 @@ public class IcebergCatalogAdapter
SecurityContext securityContext) {
Namespace ns = decodeNamespace(namespace);
return withCatalog(
- securityContext, prefix, catalog ->
Response.ok(catalog.listTables(ns)).build());
+ securityContext,
+ prefix,
+ catalog -> Response.ok(catalog.listTables(ns, pageToken,
pageSize)).build());
}
@Override
@@ -525,7 +530,9 @@ public class IcebergCatalogAdapter
SecurityContext securityContext) {
Namespace ns = decodeNamespace(namespace);
return withCatalog(
- securityContext, prefix, catalog ->
Response.ok(catalog.listViews(ns)).build());
+ securityContext,
+ prefix,
+ catalog -> Response.ok(catalog.listViews(ns, pageToken,
pageSize)).build());
}
@Override
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index dbea9f4d7..158732c32 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -90,6 +90,7 @@ import
org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import
org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager;
import org.apache.polaris.core.persistence.dao.entity.EntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
+import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -169,6 +170,23 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
return isCreate;
}
+ public ListNamespacesResponse listNamespaces(
+ Namespace parent, String pageToken, Integer pageSize) {
+ PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.LIST_NAMESPACES;
+ authorizeBasicNamespaceOperationOrThrow(op, parent);
+
+ if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
+ Page<Namespace> results = polarisCatalog.listNamespaces(parent,
pageToken, pageSize);
+ return ListNamespacesResponse.builder()
+ .addAll(results.items)
+ .nextPageToken(results.pageToken.toTokenString())
+ .build();
+ } else {
+ return CatalogHandlers.listNamespaces(
+ namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
+ }
+ }
+
private UserSecretsManager getUserSecretsManager() {
return userSecretsManager;
}
@@ -304,6 +322,22 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
return CatalogHandlers.updateNamespaceProperties(namespaceCatalog,
namespace, request);
}
+ public ListTablesResponse listTables(Namespace namespace, String pageToken,
Integer pageSize) {
+ PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES;
+ authorizeBasicNamespaceOperationOrThrow(op, namespace);
+
+ if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
+ Page<TableIdentifier> results = polarisCatalog.listTables(namespace,
pageToken, pageSize);
+ return ListTablesResponse.builder()
+ .addAll(results.items)
+ .nextPageToken(results.pageToken.toTokenString())
+ .build();
+ } else {
+ return CatalogHandlers.listTables(
+ baseCatalog, namespace, pageToken, String.valueOf(pageSize));
+ }
+ }
+
public ListTablesResponse listTables(Namespace namespace) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES;
authorizeBasicNamespaceOperationOrThrow(op, namespace);
@@ -942,6 +976,25 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
}
}
+ public ListTablesResponse listViews(Namespace namespace, String pageToken,
Integer pageSize) {
+ PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS;
+ authorizeBasicNamespaceOperationOrThrow(op, namespace);
+
+ if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
+ Page<TableIdentifier> results = polarisCatalog.listViews(namespace,
pageToken, pageSize);
+ return ListTablesResponse.builder()
+ .addAll(results.items)
+ .nextPageToken(results.pageToken.toTokenString())
+ .build();
+ } else if (baseCatalog instanceof ViewCatalog viewCatalog) {
+ return CatalogHandlers.listViews(viewCatalog, namespace, pageToken,
String.valueOf(pageSize));
+ } else {
+ throw new BadRequestException(
+ "Unsupported operation: listViews with baseCatalog type: %s",
+ baseCatalog.getClass().getName());
+ }
+ }
+
public ListTablesResponse listViews(Namespace namespace) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS;
authorizeBasicNamespaceOperationOrThrow(op, namespace);
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index 066d513a3..e0edebfc6 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -50,6 +50,7 @@ import
org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyType;
@@ -167,7 +168,8 @@ public class PolicyCatalog {
callContext.getPolarisCallContext(),
PolarisEntity.toCoreList(catalogPath),
PolarisEntityType.POLICY,
- PolarisEntitySubType.NULL_SUBTYPE)
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
.getEntities()
.stream()
.map(
diff --git
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index a6854703d..30afc5c09 100644
---
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -46,6 +46,7 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.*;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.service.TestServices;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
@@ -184,7 +185,7 @@ public class FileIOFactoryTest {
testServices
.metaStoreManagerFactory()
.getOrCreateMetaStoreManager(realmContext)
- .loadTasks(callContext.getPolarisCallContext(), "testExecutor", 1)
+ .loadTasks(callContext.getPolarisCallContext(), "testExecutor",
PageToken.fromLimit(1))
.getEntities();
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
diff --git
a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
new file mode 100644
index 000000000..97e52fb84
--- /dev/null
+++
b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.persistence.pagination;
+
+import org.apache.polaris.core.persistence.pagination.DonePageToken;
+import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PageTokenTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PageTokenTest.class);
+
+ @Test
+ void testDoneToken() {
+ Assertions.assertThat(new DonePageToken()).doesNotReturn(null,
PageToken::toString);
+ Assertions.assertThat(new DonePageToken()).returns(null,
PageToken::toTokenString);
+ Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken());
+ Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new
DonePageToken().hashCode());
+ }
+
+ @Test
+ void testReadEverythingPageToken() {
+ PageToken token = PageToken.readEverything();
+
+ Assertions.assertThat(token.toString()).isNotNull();
+ Assertions.assertThat(token.toTokenString()).isNotNull();
+ Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class);
+
+
Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything());
+ }
+}