This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new fb418a261 Extensible pagination token implementation (#1938)
fb418a261 is described below
commit fb418a2613715c219620f69fa4e9b7a1827898be
Author: Robert Stupp <[email protected]>
AuthorDate: Wed Jul 16 07:35:32 2025 +0200
Extensible pagination token implementation (#1938)
Based on #1838, following up on #1555
* Allows multiple implementations of `Token` referencing the "next page",
encapsulated in `PageToken`. No changes to `polaris-core` needed to add custom
`Token` implementations.
* Extensible to (later) support (cryptographic) signatures to prevent
tampered page-token
* Refactor pagination code to delineate API-level page tokens and internal
"pointers to data"
* Requests deal with the "previous" token, user-provided page size
(optional) and the previous request's page size.
* Concentrate the logic of combining page size requests and previous tokens
in `PageTokenUtil`
* `PageToken` subclasses are no longer necessary.
* Serialzation of `PageToken` uses Jackson serialization (smile format)
Since no (metastore level) implementation handling pagination existed
before, no backwards compatibility is needed.
Co-authored-by: Dmitri Bourlatchkov <[email protected]>
Co-authored-by: Eric Maynard <[email protected]>
---
.../apache/polaris/service/it/env/CatalogApi.java | 32 +++
.../it/test/PolarisRestCatalogIntegrationBase.java | 73 +++++-
.../PolarisEclipseLinkMetaStoreSessionImpl.java | 8 +-
.../impl/eclipselink/PolarisEclipseLinkStore.java | 18 +-
.../relational/jdbc/JdbcBasePersistenceImpl.java | 39 ++-
.../relational/jdbc/QueryGenerator.java | 54 ++++-
.../relational/jdbc/models/ModelEntity.java | 2 +
.../relational/jdbc/QueryGeneratorTest.java | 19 +-
polaris-core/build.gradle.kts | 8 +
.../core/catalog/PolarisCatalogHelpers.java | 27 ++-
.../polaris/core/config/FeatureConfiguration.java | 1 +
.../AtomicOperationMetaStoreManager.java | 86 +++----
.../persistence/dao/entity/EntitiesResult.java | 37 +--
.../persistence/dao/entity/ListEntitiesResult.java | 38 +--
.../core/persistence/pagination/DonePageToken.java | 40 ---
.../core/persistence/pagination/EntityIdToken.java | 66 +++++
.../core/persistence/pagination/HasPageSize.java | 27 ---
.../polaris/core/persistence/pagination/Page.java | 90 ++++++-
.../core/persistence/pagination/PageToken.java | 119 +++++----
.../core/persistence/pagination/PageTokenUtil.java | 267 +++++++++++++++++++++
.../pagination/ReadEverythingPageToken.java | 42 ----
.../polaris/core/persistence/pagination/Token.java | 101 ++++++++
.../TransactionalMetaStoreManagerImpl.java | 102 ++++----
.../TreeMapTransactionalPersistenceImpl.java | 25 +-
...ris.core.persistence.pagination.Token$TokenType | 20 ++
.../persistence/pagination/DummyTestToken.java} | 45 ++--
.../core/persistence/pagination/PageTokenTest.java | 168 +++++++++++++
...ris.core.persistence.pagination.Token$TokenType | 20 ++
.../quarkus/catalog/IcebergCatalogTest.java | 130 ++++++++++
.../service/catalog/iceberg/IcebergCatalog.java | 59 ++---
.../catalog/iceberg/IcebergCatalogHandler.java | 22 +-
.../persistence/pagination/PageTokenTest.java | 50 ----
32 files changed, 1329 insertions(+), 506 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
index 0274d0ea8..c235d61c1 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
@@ -101,6 +101,24 @@ public class CatalogApi extends RestApi {
}
}
+ public ListNamespacesResponse listNamespaces(
+ String catalog, Namespace parent, String pageToken, String pageSize) {
+ Map<String, String> queryParams = new HashMap<>();
+ if (!parent.isEmpty()) {
+ // TODO change this for Iceberg 1.7.2:
+ // queryParams.put("parent", RESTUtil.encodeNamespace(parent));
+ queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
+ }
+ queryParams.put("pageToken", pageToken);
+ queryParams.put("pageSize", pageSize);
+ try (Response response =
+ request("v1/{cat}/namespaces", Map.of("cat", catalog),
queryParams).get()) {
+ assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
+ ListNamespacesResponse res =
response.readEntity(ListNamespacesResponse.class);
+ return res;
+ }
+ }
+
public List<Namespace> listAllNamespacesChildFirst(String catalog) {
List<Namespace> result = new ArrayList<>();
for (int idx = -1; idx < result.size(); idx++) {
@@ -142,6 +160,20 @@ public class CatalogApi extends RestApi {
}
}
+ public ListTablesResponse listTables(
+ String catalog, Namespace namespace, String pageToken, String pageSize) {
+ String ns = RESTUtil.encodeNamespace(namespace);
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put("pageToken", pageToken);
+ queryParams.put("pageSize", pageSize);
+ try (Response res =
+ request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat",
catalog), queryParams)
+ .get()) {
+
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+ return res.readEntity(ListTablesResponse.class);
+ }
+ }
+
public void dropTable(String catalog, TableIdentifier id) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java
index 8ebee36f0..2bf55cd7e 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java
@@ -69,6 +69,8 @@ import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.Catalog;
@@ -161,7 +163,8 @@ public abstract class PolarisRestCatalogIntegrationBase
extends CatalogTests<RES
private static final String[] DEFAULT_CATALOG_PROPERTIES = {
"polaris.config.allow.unstructured.table.location", "true",
- "polaris.config.allow.external.table.location", "true"
+ "polaris.config.allow.external.table.location", "true",
+ "polaris.config.list-pagination-enabled", "true"
};
@Retention(RetentionPolicy.RUNTIME)
@@ -2023,4 +2026,72 @@ public abstract class PolarisRestCatalogIntegrationBase
extends CatalogTests<RES
assertThat(currentETag).isEqualTo(afterDMLETag); // Should match
post-DML ETag
}
}
+
+ @Test
+ public void testPaginatedListNamespaces() {
+ String prefix = "testPaginatedListNamespaces";
+ for (int i = 0; i < 20; i++) {
+ Namespace namespace = Namespace.of(prefix + i);
+ restCatalog.createNamespace(namespace);
+ }
+
+ try {
+ Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName,
Namespace.empty()))
+ .hasSize(20);
+ for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
+ int total = 0;
+ String pageToken = null;
+ do {
+ ListNamespacesResponse response =
+ catalogApi.listNamespaces(
+ currentCatalogName, Namespace.empty(), pageToken,
String.valueOf(pageSize));
+
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
+ total += response.namespaces().size();
+ pageToken = response.nextPageToken();
+ } while (pageToken != null);
+ Assertions.assertThat(total)
+ .as("Total paginated results for pageSize = " + pageSize)
+ .isEqualTo(20);
+ }
+ } finally {
+ for (int i = 0; i < 20; i++) {
+ Namespace namespace = Namespace.of(prefix + i);
+ restCatalog.dropNamespace(namespace);
+ }
+ }
+ }
+
+ @Test
+ public void testPaginatedListTables() {
+ String prefix = "testPaginatedListTables";
+ Namespace namespace = Namespace.of(prefix);
+ restCatalog.createNamespace(namespace);
+ for (int i = 0; i < 20; i++) {
+ restCatalog.createTable(TableIdentifier.of(namespace, prefix + i),
SCHEMA);
+ }
+
+ try {
+ Assertions.assertThat(catalogApi.listTables(currentCatalogName,
namespace)).hasSize(20);
+ for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
+ int total = 0;
+ String pageToken = null;
+ do {
+ ListTablesResponse response =
+ catalogApi.listTables(
+ currentCatalogName, namespace, pageToken,
String.valueOf(pageSize));
+
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
+ total += response.identifiers().size();
+ pageToken = response.nextPageToken();
+ } while (pageToken != null);
+ Assertions.assertThat(total)
+ .as("Total paginated results for pageSize = " + pageSize)
+ .isEqualTo(20);
+ }
+ } finally {
+ for (int i = 0; i < 20; i++) {
+ restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
+ }
+ restCatalog.dropNamespace(namespace);
+ }
+ }
}
diff --git
a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
index b7b0a951e..0981275fd 100644
---
a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
+++
b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java
@@ -56,7 +56,7 @@ import
org.apache.polaris.core.exceptions.AlreadyExistsException;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
-import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import
org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
@@ -480,11 +480,7 @@ public class PolarisEclipseLinkMetaStoreSessionImpl
extends AbstractTransactiona
.map(ModelEntity::toEntity)
.filter(entityFilter);
- if (pageToken instanceof HasPageSize hasPageSize) {
- data = data.limit(hasPageSize.getPageSize());
- }
-
- return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
+ return Page.mapped(pageToken, data, transformer,
EntityIdToken::fromEntity);
}
/** {@inheritDoc} */
diff --git
a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
index 4a889d3c0..60251a7b6 100644
---
a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
+++
b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java
@@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
@@ -294,7 +295,17 @@ public class PolarisEclipseLinkStore {
// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
String hql =
- "SELECT m from ModelEntity m where m.catalogId=:catalogId and
m.parentId=:parentId and m.typeCode=:typeCode";
+ "SELECT m from ModelEntity m where"
+ + " m.catalogId=:catalogId and m.parentId=:parentId and
m.typeCode=:typeCode";
+
+ var entityIdToken = pageToken.valueAs(EntityIdToken.class);
+ if (entityIdToken.isPresent()) {
+ hql += " and m.id > :tokenId";
+ }
+
+ if (pageToken.paginationRequested()) {
+ hql += " order by m.id asc";
+ }
TypedQuery<ModelEntity> query =
session
@@ -303,6 +314,11 @@ public class PolarisEclipseLinkStore {
.setParameter("parentId", parentId)
.setParameter("typeCode", entityType.getCode());
+ if (entityIdToken.isPresent()) {
+ long tokenId = entityIdToken.get().entityId();
+ query = query.setParameter("tokenId", tokenId);
+ }
+
return query.getResultList();
}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
index 5c3dd1dba..c93765c84 100644
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -53,7 +54,7 @@ import
org.apache.polaris.core.persistence.IntegrationPersistence;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
-import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -459,7 +460,7 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
- Map<String, Object> params =
+ Map<String, Object> whereEquals =
Map.of(
"catalog_id",
catalogId,
@@ -469,29 +470,41 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
entityType.getCode(),
"realm_id",
realmId);
+ Map<String, Object> whereGreater;
// Limit can't be pushed down, due to client side filtering
// absence of transaction.
+ String orderByColumnName = null;
+ if (pageToken.paginationRequested()) {
+ orderByColumnName = ModelEntity.ID_COLUMN;
+ whereGreater =
+ pageToken
+ .valueAs(EntityIdToken.class)
+ .map(
+ entityIdToken ->
+ Map.<String, Object>of(ModelEntity.ID_COLUMN,
entityIdToken.entityId()))
+ .orElse(Map.of());
+ } else {
+ whereGreater = Map.of();
+ }
+
try {
PreparedQuery query =
QueryGenerator.generateSelectQuery(
- ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
- List<PolarisBaseEntity> results = new ArrayList<>();
+ ModelEntity.ALL_COLUMNS,
+ ModelEntity.TABLE_NAME,
+ whereEquals,
+ whereGreater,
+ orderByColumnName);
+ AtomicReference<Page<T>> results = new AtomicReference<>();
datasourceOperations.executeSelectOverStream(
query,
new ModelEntity(),
stream -> {
var data = stream.filter(entityFilter);
- if (pageToken instanceof HasPageSize hasPageSize) {
- data = data.limit(hasPageSize.getPageSize());
- }
- data.forEach(results::add);
+ results.set(Page.mapped(pageToken, data, transformer,
EntityIdToken::fromEntity));
});
- List<T> resultsOrEmpty =
- results == null
- ? Collections.emptyList()
- :
results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
- return Page.fromItems(resultsOrEmpty);
+ return results.get();
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s",
e.getMessage()), e);
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
index c6bad0a1c..a06bf283a 100644
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
@@ -20,6 +20,7 @@ package org.apache.polaris.persistence.relational.jdbc;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -59,8 +60,27 @@ public class QueryGenerator {
@Nonnull List<String> projections,
@Nonnull String tableName,
@Nonnull Map<String, Object> whereClause) {
- QueryFragment where = generateWhereClause(new HashSet<>(projections),
whereClause);
- PreparedQuery query = generateSelectQuery(projections, tableName,
where.sql());
+ return generateSelectQuery(projections, tableName, whereClause, Map.of(),
null);
+ }
+
+ /**
+ * Generates a SELECT query with projection and filtering.
+ *
+ * @param projections List of columns to retrieve.
+ * @param tableName Target table name.
+ * @param whereEquals Column-value pairs used in WHERE filtering.
+ * @return A parameterized SELECT query.
+ * @throws IllegalArgumentException if any whereClause column isn't in
projections.
+ */
+ public static PreparedQuery generateSelectQuery(
+ @Nonnull List<String> projections,
+ @Nonnull String tableName,
+ @Nonnull Map<String, Object> whereEquals,
+ @Nonnull Map<String, Object> whereGreater,
+ @Nullable String orderByColumn) {
+ QueryFragment where =
+ generateWhereClause(new HashSet<>(projections), whereEquals,
whereGreater);
+ PreparedQuery query = generateSelectQuery(projections, tableName,
where.sql(), orderByColumn);
return new PreparedQuery(query.sql(), where.parameters());
}
@@ -108,7 +128,8 @@ public class QueryGenerator {
params.add(realmId);
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND
realm_id = ?";
return new PreparedQuery(
- generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME,
where).sql(), params);
+ generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME,
where, null).sql(),
+ params);
}
/**
@@ -157,7 +178,7 @@ public class QueryGenerator {
@Nonnull List<Object> values,
@Nonnull Map<String, Object> whereClause) {
List<Object> bindingParams = new ArrayList<>(values);
- QueryFragment where = generateWhereClause(new HashSet<>(allColumns),
whereClause);
+ QueryFragment where = generateWhereClause(new HashSet<>(allColumns),
whereClause, Map.of());
String setClause = allColumns.stream().map(c -> c + " =
?").collect(Collectors.joining(", "));
String sql =
"UPDATE " + getFullyQualifiedTableName(tableName) + " SET " +
setClause + where.sql();
@@ -177,34 +198,49 @@ public class QueryGenerator {
@Nonnull List<String> tableColumns,
@Nonnull String tableName,
@Nonnull Map<String, Object> whereClause) {
- QueryFragment where = generateWhereClause(new HashSet<>(tableColumns),
whereClause);
+ QueryFragment where = generateWhereClause(new HashSet<>(tableColumns),
whereClause, Map.of());
return new PreparedQuery(
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(),
where.parameters());
}
private static PreparedQuery generateSelectQuery(
- @Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull
String filter) {
+ @Nonnull List<String> columnNames,
+ @Nonnull String tableName,
+ @Nonnull String filter,
+ @Nullable String orderByColumn) {
String sql =
"SELECT "
+ String.join(", ", columnNames)
+ " FROM "
+ getFullyQualifiedTableName(tableName)
+ filter;
+ if (orderByColumn != null) {
+ sql += " ORDER BY " + orderByColumn + " ASC";
+ }
return new PreparedQuery(sql, Collections.emptyList());
}
@VisibleForTesting
static QueryFragment generateWhereClause(
- @Nonnull Set<String> tableColumns, @Nonnull Map<String, Object>
whereClause) {
+ @Nonnull Set<String> tableColumns,
+ @Nonnull Map<String, Object> whereEquals,
+ @Nonnull Map<String, Object> whereGreater) {
List<String> conditions = new ArrayList<>();
List<Object> parameters = new ArrayList<>();
- for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
+ for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
if (!tableColumns.contains(entry.getKey()) &&
!entry.getKey().equals("realm_id")) {
throw new IllegalArgumentException("Invalid query column: " +
entry.getKey());
}
conditions.add(entry.getKey() + " = ?");
parameters.add(entry.getValue());
}
+ for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
+ if (!tableColumns.contains(entry.getKey()) &&
!entry.getKey().equals("realm_id")) {
+ throw new IllegalArgumentException("Invalid query column: " +
entry.getKey());
+ }
+ conditions.add(entry.getKey() + " > ?");
+ parameters.add(entry.getValue());
+ }
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND
", conditions);
return new QueryFragment(clause, parameters);
}
@@ -258,7 +294,7 @@ public class QueryGenerator {
QueryFragment where = new QueryFragment(clause, finalParams);
PreparedQuery query =
- generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME,
where.sql());
+ generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME,
where.sql(), null);
return new PreparedQuery(query.sql(), where.parameters());
}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
index e9a2bdb55..6eaec072d 100644
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
@@ -33,6 +33,8 @@ import
org.apache.polaris.persistence.relational.jdbc.DatabaseType;
public class ModelEntity implements Converter<PolarisBaseEntity> {
public static final String TABLE_NAME = "ENTITIES";
+ public static final String ID_COLUMN = "id";
+
public static final List<String> ALL_COLUMNS =
List.of(
"id",
diff --git
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
index 798dd92e7..6df78eff5 100644
---
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
+++
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
@@ -184,7 +184,8 @@ public class QueryGeneratorTest {
Map<String, Object> whereClause = new HashMap<>();
whereClause.put("name", "test");
assertEquals(
- " WHERE name = ?", QueryGenerator.generateWhereClause(Set.of("name"),
whereClause).sql());
+ " WHERE name = ?",
+ QueryGenerator.generateWhereClause(Set.of("name"), whereClause,
Map.of()).sql());
}
@Test
@@ -194,13 +195,25 @@ public class QueryGeneratorTest {
whereClause.put("version", 1);
assertEquals(
" WHERE name = ? AND version = ?",
- QueryGenerator.generateWhereClause(Set.of("name", "version"),
whereClause).sql());
+ QueryGenerator.generateWhereClause(Set.of("name", "version"),
whereClause, Map.of()).sql());
+ }
+
+ @Test
+ void testGenerateWhereClause_multipleConditions_AndInequality() {
+ Map<String, Object> whereClause = new HashMap<>();
+ whereClause.put("name", "test");
+ whereClause.put("version", 1);
+ assertEquals(
+ " WHERE name = ? AND version = ? AND id > ?",
+ QueryGenerator.generateWhereClause(
+ Set.of("name", "version", "id"), whereClause, Map.of("id",
123))
+ .sql());
}
@Test
void testGenerateWhereClause_emptyMap() {
Map<String, Object> whereClause = Collections.emptyMap();
- assertEquals("", QueryGenerator.generateWhereClause(Set.of(),
whereClause).sql());
+ assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause,
Map.of()).sql());
}
@Test
diff --git a/polaris-core/build.gradle.kts b/polaris-core/build.gradle.kts
index ca24aeecd..5f8b1ce3e 100644
--- a/polaris-core/build.gradle.kts
+++ b/polaris-core/build.gradle.kts
@@ -36,6 +36,11 @@ dependencies {
implementation("com.fasterxml.jackson.core:jackson-annotations")
implementation("com.fasterxml.jackson.core:jackson-core")
implementation("com.fasterxml.jackson.core:jackson-databind")
+ implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
+
implementation(libs.caffeine)
implementation(libs.commons.lang3)
implementation(libs.commons.codec1)
@@ -96,6 +101,9 @@ dependencies {
implementation(platform(libs.google.cloud.storage.bom))
implementation("com.google.cloud:google-cloud-storage")
+ testCompileOnly(project(":polaris-immutables"))
+ testAnnotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
testFixturesApi("com.fasterxml.jackson.core:jackson-core")
testFixturesApi("com.fasterxml.jackson.core:jackson-databind")
testFixturesApi(libs.commons.lang3)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
index e10c24f2f..b37efaae3 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
@@ -19,7 +19,6 @@
package org.apache.polaris.core.catalog;
import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -62,20 +61,28 @@ public class PolarisCatalogHelpers {
return Namespace.of(parentLevels);
}
- public static List<Namespace> nameAndIdToNamespaces(
- List<PolarisEntity> catalogPath, List<PolarisEntity.NameAndId> entities)
{
+ public static Namespace nameAndIdToNamespace(
+ List<PolarisEntity> catalogPath, PolarisEntity.NameAndId entity) {
+ // Skip element 0 which is the catalog entity
+ String[] fullName = new String[catalogPath.size()];
+ for (int i = 0; i < fullName.length - 1; ++i) {
+ fullName[i] = catalogPath.get(i + 1).getName();
+ }
+ fullName[fullName.length - 1] = entity.getName();
+ return Namespace.of(fullName);
+ }
+
+ /**
+ * Given the shortnames/ids of entities that all live under the given
catalogPath, reconstructs
+ * TableIdentifier objects for each that all hold the catalogPath excluding
the catalog entity.
+ */
+ public static Namespace parentNamespace(List<PolarisEntity> catalogPath) {
// Skip element 0 which is the catalog entity
String[] parentNamespaces = new String[catalogPath.size() - 1];
for (int i = 0; i < parentNamespaces.length; ++i) {
parentNamespaces[i] = catalogPath.get(i + 1).getName();
}
- List<Namespace> namespaces = new ArrayList<>();
- for (PolarisEntity.NameAndId entity : entities) {
- String[] fullName = Arrays.copyOf(parentNamespaces,
parentNamespaces.length + 1);
- fullName[fullName.length - 1] = entity.getName();
- namespaces.add(Namespace.of(fullName));
- }
- return namespaces;
+ return Namespace.of(parentNamespaces);
}
/**
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 2f69e898e..05b7e0dec 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -222,6 +222,7 @@ public class FeatureConfiguration<T> extends
PolarisConfiguration<T> {
public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("LIST_PAGINATION_ENABLED")
+ .catalogConfig("polaris.config.list-pagination-enabled")
.description("If set to true, pagination for APIs like listTables is
enabled.")
.defaultValue(false)
.buildFeatureConfiguration();
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index 08bda144c..e4e77c155 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -31,6 +31,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.AsyncTaskType;
@@ -698,23 +699,20 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// return list of active entities
// TODO: Clean up shared logic for catalogId/parentId
- long catalogId =
- catalogPath == null || catalogPath.size() == 0 ? 0l :
catalogPath.get(0).getId();
+ long catalogId = catalogPath == null || catalogPath.isEmpty() ? 0L :
catalogPath.get(0).getId();
long parentId =
- catalogPath == null || catalogPath.size() == 0
- ? 0l
+ catalogPath == null || catalogPath.isEmpty()
+ ? 0L
: catalogPath.get(catalogPath.size() - 1).getId();
- Page<EntityNameLookupRecord> resultPage =
- ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken);
// prune the returned list with only entities matching the entity subtype
- if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
- resultPage =
- pageToken.buildNextPage(
- resultPage.items.stream()
- .filter(rec -> rec.getSubTypeCode() ==
entitySubType.getCode())
- .collect(Collectors.toList()));
- }
+ Predicate<PolarisBaseEntity> filter =
+ entitySubType != PolarisEntitySubType.ANY_SUBTYPE
+ ? e -> e.getSubTypeCode() == entitySubType.getCode()
+ : entity -> true;
+
+ Page<EntityNameLookupRecord> resultPage =
+ ms.listEntities(callCtx, catalogId, parentId, entityType, filter,
pageToken);
// TODO: Use post-validation to enforce consistent view against
catalogPath. In the
// meantime, happens-before ordering semantics aren't guaranteed during
high-concurrency
@@ -957,7 +955,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
e.getExistingEntity().getSubTypeCode()));
}
- return new EntitiesResult(createdEntities);
+ return new EntitiesResult(Page.fromItems(createdEntities));
}
/** {@inheritDoc} */
@@ -1020,7 +1018,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
}
// good, all success
- return new EntitiesResult(updatedEntities);
+ return new EntitiesResult(Page.fromItems(updatedEntities));
}
/** {@inheritDoc} */
@@ -1185,7 +1183,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
entity -> true,
Function.identity(),
PageToken.fromLimit(2))
- .items;
+ .items();
// if we have 2, we cannot drop the catalog. If only one left, better be
the admin role
if (catalogRoles.size() > 1) {
@@ -1519,32 +1517,38 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
Function.identity(),
pageToken);
- List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
final AtomicInteger failedLeaseCount = new AtomicInteger(0);
- availableTasks.items.forEach(
- task -> {
- PolarisBaseEntity.Builder updatedTaskBuilder = new
PolarisBaseEntity.Builder(task);
- Map<String, String> properties =
- PolarisObjectMapperUtil.deserializeProperties(callCtx,
task.getProperties());
- properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID,
executorId);
- properties.put(
- PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
- String.valueOf(callCtx.getClock().millis()));
- properties.put(
- PolarisTaskConstants.ATTEMPT_COUNT,
- String.valueOf(
-
Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT,
"0"))
- + 1));
- updatedTaskBuilder.properties(
- PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
- EntityResult result =
- updateEntityPropertiesIfNotChanged(callCtx, null,
updatedTaskBuilder.build());
- if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
- loadedTasks.add(result.getEntity());
- } else {
- failedLeaseCount.getAndIncrement();
- }
- });
+ List<PolarisBaseEntity> loadedTasks =
+ availableTasks.items().stream()
+ .map(
+ task -> {
+ PolarisBaseEntity.Builder updatedTaskBuilder =
+ new PolarisBaseEntity.Builder(task);
+ Map<String, String> properties =
+ PolarisObjectMapperUtil.deserializeProperties(callCtx,
task.getProperties());
+
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId);
+ properties.put(
+ PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
+ String.valueOf(callCtx.getClock().millis()));
+ properties.put(
+ PolarisTaskConstants.ATTEMPT_COUNT,
+ String.valueOf(
+ Integer.parseInt(
+
properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0"))
+ + 1));
+ updatedTaskBuilder.properties(
+ PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
+ EntityResult result =
+ updateEntityPropertiesIfNotChanged(callCtx, null,
updatedTaskBuilder.build());
+ if (result.getReturnStatus() ==
BaseResult.ReturnStatus.SUCCESS) {
+ return result.getEntity();
+ } else {
+ failedLeaseCount.getAndIncrement();
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
// Since the contract of this method is to only return an empty list once
no available tasks
// are found anymore, if we happen to fail to lease any tasks at all due
to all of them
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
index e27b69680..13c7422f0 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java
@@ -18,25 +18,20 @@
*/
package org.apache.polaris.core.persistence.dao.entity;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
-import java.util.Optional;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.persistence.pagination.Page;
-import org.apache.polaris.core.persistence.pagination.PageToken;
/** a set of returned entities result */
public class EntitiesResult extends BaseResult {
// null if not success. Else the list of entities being returned
- private final List<PolarisBaseEntity> entities;
- private final Optional<PageToken> pageTokenOpt;
+ private final Page<PolarisBaseEntity> entities;
public static EntitiesResult fromPage(Page<PolarisBaseEntity> page) {
- return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken));
+ return new EntitiesResult(page);
}
/**
@@ -48,11 +43,6 @@ public class EntitiesResult extends BaseResult {
public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String
extraInformation) {
super(errorStatus, extraInformation);
this.entities = null;
- this.pageTokenOpt = Optional.empty();
- }
-
- public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) {
- this(entities, Optional.empty());
}
/**
@@ -60,29 +50,12 @@ public class EntitiesResult extends BaseResult {
*
* @param entities list of entities being returned, implies success
*/
- public EntitiesResult(
- @Nonnull List<PolarisBaseEntity> entities, @Nonnull Optional<PageToken>
pageTokenOpt) {
+ public EntitiesResult(@Nonnull Page<PolarisBaseEntity> entities) {
super(ReturnStatus.SUCCESS);
this.entities = entities;
- this.pageTokenOpt = pageTokenOpt;
- }
-
- @JsonCreator
- private EntitiesResult(
- @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus,
- @JsonProperty("extraInformation") String extraInformation,
- @JsonProperty("entities") List<PolarisBaseEntity> entities,
- @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) {
- super(returnStatus, extraInformation);
- this.entities = entities;
- this.pageTokenOpt = pageTokenOpt;
- }
-
- public List<PolarisBaseEntity> getEntities() {
- return entities;
}
- public Optional<PageToken> getPageToken() {
- return pageTokenOpt;
+ public @Nullable List<PolarisBaseEntity> getEntities() {
+ return entities == null ? null : entities.items();
}
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
index 10669e899..a7a51d229 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java
@@ -18,26 +18,21 @@
*/
package org.apache.polaris.core.persistence.dao.entity;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
-import java.util.Optional;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.persistence.pagination.Page;
-import org.apache.polaris.core.persistence.pagination.PageToken;
/** the return the result for a list entities call */
public class ListEntitiesResult extends BaseResult {
// null if not success. Else the list of entities being returned
- private final List<EntityNameLookupRecord> entities;
- private final Optional<PageToken> pageTokenOpt;
+ private final Page<EntityNameLookupRecord> entities;
/** Create a {@link ListEntitiesResult} from a {@link Page} */
public static ListEntitiesResult fromPage(Page<EntityNameLookupRecord> page)
{
- return new ListEntitiesResult(page.items,
Optional.ofNullable(page.pageToken));
+ return new ListEntitiesResult(page);
}
/**
@@ -46,13 +41,9 @@ public class ListEntitiesResult extends BaseResult {
* @param errorCode error code, cannot be SUCCESS
* @param extraInformation extra information
*/
- public ListEntitiesResult(
- @Nonnull ReturnStatus errorCode,
- @Nullable String extraInformation,
- @Nonnull Optional<PageToken> pageTokenOpt) {
+ public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String
extraInformation) {
super(errorCode, extraInformation);
this.entities = null;
- this.pageTokenOpt = pageTokenOpt;
}
/**
@@ -60,29 +51,16 @@ public class ListEntitiesResult extends BaseResult {
*
* @param entities list of entities being returned, implies success
*/
- public ListEntitiesResult(
- @Nonnull List<EntityNameLookupRecord> entities, @Nonnull
Optional<PageToken> pageTokenOpt) {
+ public ListEntitiesResult(Page<EntityNameLookupRecord> entities) {
super(ReturnStatus.SUCCESS);
this.entities = entities;
- this.pageTokenOpt = pageTokenOpt;
}
- @JsonCreator
- private ListEntitiesResult(
- @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus,
- @JsonProperty("extraInformation") String extraInformation,
- @JsonProperty("entities") List<EntityNameLookupRecord> entities,
- @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) {
- super(returnStatus, extraInformation);
- this.entities = entities;
- this.pageTokenOpt = pageTokenOpt;
- }
-
- public List<EntityNameLookupRecord> getEntities() {
- return entities;
+ public @Nullable List<EntityNameLookupRecord> getEntities() {
+ return entities == null ? null : entities.items();
}
- public Optional<PageToken> getPageToken() {
- return pageTokenOpt;
+ public Page<EntityNameLookupRecord> getPage() {
+ return entities == null ? Page.fromItems(List.of()) : entities;
}
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
deleted file mode 100644
index d46ea7b02..000000000
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.core.persistence.pagination;
-
-import java.util.List;
-
-/**
- * A {@link PageToken} string that represents the lack of a page token.
Returns `null` in
- * `toTokenString`, which the client will interpret as there being no more
data available.
- */
-public class DonePageToken extends PageToken {
-
- public DonePageToken() {}
-
- @Override
- public String toTokenString() {
- return null;
- }
-
- @Override
- protected PageToken updated(List<?> newData) {
- throw new IllegalStateException("DonePageToken.updated is invalid");
- }
-}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java
new file mode 100644
index 000000000..8a9a03b1b
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.pagination;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Pagination {@linkplain Token token} backed by {@link
PolarisBaseEntity#getId() entity ID}. */
+@PolarisImmutable
+@JsonSerialize(as = ImmutableEntityIdToken.class)
+@JsonDeserialize(as = ImmutableEntityIdToken.class)
+public interface EntityIdToken extends Token {
+ String ID = "e";
+
+ @JsonProperty("i")
+ long entityId();
+
+ @Override
+ default String getT() {
+ return ID;
+ }
+
+ static @Nullable EntityIdToken fromEntity(PolarisBaseEntity entity) {
+ if (entity == null) {
+ return null;
+ }
+ return fromEntityId(entity.getId());
+ }
+
+ static EntityIdToken fromEntityId(long entityId) {
+ return ImmutableEntityIdToken.builder().entityId(entityId).build();
+ }
+
+ final class EntityIdTokenType implements TokenType {
+ @Override
+ public String id() {
+ return ID;
+ }
+
+ @Override
+ public Class<? extends Token> javaType() {
+ return EntityIdToken.class;
+ }
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
deleted file mode 100644
index c6b216fcd..000000000
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.core.persistence.pagination;
-
-/**
- * A light interface for {@link PageToken} implementations to express that
they have a page size
- * that should be respected
- */
-public interface HasPageSize {
- int getPageSize();
-}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
index 18287f85c..4a3de4d12 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java
@@ -18,25 +18,99 @@
*/
package org.apache.polaris.core.persistence.pagination;
+import static java.util.Spliterators.iterator;
+
+import jakarta.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
- * An immutable page of items plus their paging cursor. The {@link PageToken}
here can be used to
- * continue the listing operation that generated the `items`.
+ * An immutable page of items plus the next-page token value, if there are
more items. The {@link
+ * #encodedResponseToken()} here can be used to continue the listing operation
that generated the
+ * `items`.
*/
public class Page<T> {
- public final PageToken pageToken;
- public final List<T> items;
+ private final PageToken request;
+ private final List<T> items;
+ @Nullable private final Token nextToken;
- public Page(PageToken pageToken, List<T> items) {
- this.pageToken = pageToken;
+ private Page(PageToken request, @Nullable Token nextToken, List<T> items) {
+ this.request = request;
+ this.nextToken = nextToken;
this.items = items;
}
/**
- * Used to wrap a {@link List<T>} of items into a {@link Page <T>} when
there are no more pages
+ * Builds a complete response page for the full list of relevant items. No
subsequence pages of
+ * related data exist.
*/
public static <T> Page<T> fromItems(List<T> items) {
- return new Page<>(new DonePageToken(), items);
+ return new Page<>(PageToken.readEverything(), null, items);
+ }
+
+ /**
+ * Produces a response page by consuming the number of items from the
provided stream according to
+ * the {@code request} parameter. Source items can be converted to a
different type by providing a
+ * {@code mapper} function. The page token for the response will be produced
from the request data
+ * combined with the pointer to the next page of data provided by the {@code
dataPointer}
+ * function.
+ *
+ * @param request defines pagination parameters that were uses to produce
this page of data.
+ * @param items stream of source data
+ * @param mapper converter from source data types to response data types.
+ * @param tokenBuilder determines the {@link Token} used to start the next
page of data given the
+ * last item from the previous page. The output of this function will be
available from {@link
+ * PageToken#value()} associated with the request for the next page.
+ */
+ public static <R, T> Page<R> mapped(
+ PageToken request, Stream<T> items, Function<T, R> mapper, Function<T,
Token> tokenBuilder) {
+ List<R> data;
+ T last = null;
+ if (!request.paginationRequested()) {
+ // short-cut for "no pagination"
+ data = items.map(mapper).collect(Collectors.toList());
+ } else {
+ data = new ArrayList<>(request.pageSize().orElse(10));
+
+ Iterator<T> it = iterator(items.spliterator());
+ int limit = request.pageSize().orElse(Integer.MAX_VALUE);
+ while (it.hasNext() && data.size() < limit) {
+ last = it.next();
+ data.add(mapper.apply(last));
+ }
+
+ // Signal "no more data" if the number of items is less than the
requested page size or if
+ // there is no more data.
+ if (data.size() < limit || !it.hasNext()) {
+ last = null;
+ }
+ }
+
+ return new Page<>(request, tokenBuilder.apply(last), data);
+ }
+
+ public List<T> items() {
+ return items;
+ }
+
+ /**
+ * Returns a page token in encoded form suitable for returning to API
clients. The string returned
+ * from this method is expected to be parsed by {@link
PageToken#build(String, Integer)} when
+ * servicing the request for the next page of related data.
+ */
+ public @Nullable String encodedResponseToken() {
+ return PageTokenUtil.encodePageToken(request, nextToken);
+ }
+
+ /**
+ * Converts this page of data to objects of a different type, while
maintaining the underlying
+ * pointer to the next page of source data.
+ */
+ public <R> Page<R> map(Function<T, R> mapper) {
+ return new Page<>(request, nextToken,
items.stream().map(mapper).collect(Collectors.toList()));
}
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
index 2e335ccd4..a1dceffdd 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java
@@ -18,82 +18,75 @@
*/
package org.apache.polaris.core.persistence.pagination;
-import java.util.List;
-import java.util.Objects;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
-/**
- * Represents a page token that can be used by operations like `listTables`.
Clients that specify a
- * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the
response, the content of
- * which is a serialized PageToken.
- *
- * <p>By providing that in the next query's `pageToken`, the client can resume
listing where they
- * left off. If the client provides a `pageToken` or `pageSize` but
`next-page-token` is null in the
- * response, that means there is no more data to read.
- */
-public abstract class PageToken {
-
- /** Build a new PageToken that reads everything */
- public static PageToken readEverything() {
- return build(null, null);
- }
+/** A wrapper for pagination information passed in as part of a request. */
+@PolarisImmutable
+@JsonSerialize(as = ImmutablePageToken.class)
+@JsonDeserialize(as = ImmutablePageToken.class)
+public interface PageToken {
+ // Serialization property names are intentionally short to reduce the size
of the serialized
+ // paging token.
- /** Build a new PageToken from an input String, without a specified page
size */
- public static PageToken fromString(String token) {
- return build(token, null);
- }
-
- /** Build a new PageToken from a limit */
- public static PageToken fromLimit(Integer pageSize) {
- return build(null, pageSize);
- }
+ /** The requested page size (optional). */
+ @JsonProperty("p")
+ OptionalInt pageSize();
- /** Build a {@link PageToken} from the input string and page size */
- public static PageToken build(String token, Integer pageSize) {
- if (token == null || token.isEmpty()) {
- if (pageSize != null) {
- return new LimitPageToken(pageSize);
- } else {
- return new ReadEverythingPageToken();
- }
- } else {
- // TODO implement, split out by the token's prefix
- throw new IllegalArgumentException("Unrecognized page token: " + token);
- }
+ /** Convenience for {@code pageSize().isPresent()}. */
+ default boolean paginationRequested() {
+ return pageSize().isPresent();
}
- /** Serialize a {@link PageToken} into a string */
- public abstract String toTokenString();
-
/**
- * Builds a new page token to reflect new data that's been read. If the
amount of data read is
- * less than the pageSize, this will return a {@link DonePageToken}
+ * Paging token value, if present. Serialized paging tokens always have a
value, but "synthetic"
+ * paging tokens like {@link #readEverything()} or {@link #fromLimit(int)}
do not have a token
+ * value.
*/
- protected abstract PageToken updated(List<?> newData);
+ @JsonProperty("v")
+ Optional<Token> value();
+
+ // Note: another property can be added to contain a (cryptographic)
signature, if we want to
+ // ensure that a paging-token hasn't been tampered.
/**
- * Builds a {@link Page <T>} from a {@link List<T>}. The {@link PageToken}
attached to the new
- * {@link Page <T>} is the same as the result of calling {@link
#updated(List)} on this {@link
- * PageToken}.
+ * Paging token value, if it is present and an instance of the given {@code
type}. This is a
+ * convenience to prevent duplication of type casts.
*/
- public final <T> Page<T> buildNextPage(List<T> data) {
- return new Page<T>(updated(data), data);
+ default <T extends Token> Optional<T> valueAs(Class<T> type) {
+ return value()
+ .flatMap(
+ t ->
+ type.isAssignableFrom(t.getClass()) ?
Optional.of(type.cast(t)) : Optional.empty());
+ }
+
+ /** Represents a non-paginated request. */
+ static PageToken readEverything() {
+ return PageTokenUtil.READ_EVERYTHING;
}
- @Override
- public final boolean equals(Object o) {
- if (o instanceof PageToken) {
- return Objects.equals(this.toTokenString(), ((PageToken)
o).toTokenString());
- } else {
- return false;
- }
+ /** Represents a request to start paginating with a particular page size. */
+ static PageToken fromLimit(int limit) {
+ return PageTokenUtil.fromLimit(limit);
}
- @Override
- public final int hashCode() {
- if (toTokenString() == null) {
- return 0;
- } else {
- return toTokenString().hashCode();
- }
+ /**
+ * Reconstructs a page token from the API-level page token string (returned
to the client in the
+ * response to a previous request for similar data) and an API-level new
requested page size.
+ *
+ * @param serializedPageToken page token from the {@link
Page#encodedResponseToken() previous
+ * page}
+ * @param requestedPageSize optional page size for the next page. If not
set, the page size of the
+ * previous page (encoded in the page token string) will be reused.
+ * @see Page#encodedResponseToken()
+ */
+ static PageToken build(
+ @Nullable String serializedPageToken, @Nullable Integer
requestedPageSize) {
+ return PageTokenUtil.decodePageRequest(serializedPageToken,
requestedPageSize);
}
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java
new file mode 100644
index 000000000..8a811f41c
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.pagination;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Collections.unmodifiableMap;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DatabindContext;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+import com.fasterxml.jackson.dataformat.smile.databind.SmileMapper;
+import com.google.common.annotations.VisibleForTesting;
+import jakarta.annotation.Nullable;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.ServiceLoader;
+
+final class PageTokenUtil {
+
+ private static final ObjectMapper SMILE_MAPPER = new
SmileMapper().findAndRegisterModules();
+
+ /** Constant for {@link PageToken#readEverything()}. */
+ static final PageToken READ_EVERYTHING =
+ new PageToken() {
+ @Override
+ public OptionalInt pageSize() {
+ return OptionalInt.empty();
+ }
+
+ @Override
+ public Optional<Token> value() {
+ return Optional.empty();
+ }
+
+ @Override
+ public int hashCode() {
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PageToken)) {
+ return false;
+ }
+ PageToken other = (PageToken) obj;
+ return other.pageSize().isEmpty() && other.value().isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "PageToken(everything)";
+ }
+ };
+
+ static PageToken fromLimit(int limit) {
+ return new PageToken() {
+ @Override
+ public OptionalInt pageSize() {
+ return OptionalInt.of(limit);
+ }
+
+ @Override
+ public Optional<Token> value() {
+ return Optional.empty();
+ }
+
+ @Override
+ public int hashCode() {
+ return 2;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PageToken)) {
+ return false;
+ }
+ PageToken other = (PageToken) obj;
+ return other.pageSize().equals(pageSize()) && other.value().isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "PageToken(limit = " + limit + ")";
+ }
+ };
+ }
+
+ private PageTokenUtil() {}
+
+ /**
+ * Decodes a {@link PageToken} from API request parameters for the page-size
and a serialized page
+ * token.
+ */
+ static PageToken decodePageRequest(
+ @Nullable String requestedPageToken, @Nullable Integer
requestedPageSize) {
+ if (requestedPageToken != null) {
+ var bytes = Base64.getUrlDecoder().decode(requestedPageToken);
+ try {
+ var pageToken = SMILE_MAPPER.readValue(bytes, PageToken.class);
+ if (requestedPageSize != null) {
+ int pageSizeInt = requestedPageSize;
+ checkArgument(pageSizeInt >= 0, "Invalid page size");
+ if (pageToken.pageSize().orElse(-1) != pageSizeInt) {
+ pageToken =
ImmutablePageToken.builder().from(pageToken).pageSize(pageSizeInt).build();
+ }
+ }
+ return pageToken;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (requestedPageSize != null) {
+ int pageSizeInt = requestedPageSize;
+ checkArgument(pageSizeInt >= 0, "Invalid page size");
+ return fromLimit(pageSizeInt);
+ } else {
+ return READ_EVERYTHING;
+ }
+ }
+
+ /**
+ * Returns the encoded ({@link String} serialized) {@link PageToken} built
from the given {@link
+ * PageToken currentPageToken}, the page token of the current request, and
{@link Token
+ * nextToken}, the token for the next page.
+ *
+ * @param currentPageToken page token of the currently handled API request,
must not be {@code
+ * null}
+ * @param nextToken token for the next page, can be {@code null}, in which
case the result will be
+ * {@code null}
+ * @return base-64/url-encoded serialized {@link PageToken} for the next
page.
+ */
+ static @Nullable String encodePageToken(PageToken currentPageToken,
@Nullable Token nextToken) {
+ if (nextToken == null) {
+ return null;
+ }
+
+ return serializePageToken(
+ ImmutablePageToken.builder()
+ .pageSize(currentPageToken.pageSize())
+ .value(nextToken)
+ .build());
+ }
+
+ /**
+ * Serializes the given {@link PageToken pageToken}
+ *
+ * @return base-64/url-encoded serialized {@link PageToken} for the next
page.
+ */
+ @VisibleForTesting
+ static @Nullable String serializePageToken(PageToken pageToken) {
+ if (pageToken == null) {
+ return null;
+ }
+
+ try {
+ var serialized = SMILE_MAPPER.writeValueAsBytes(pageToken);
+ return Base64.getUrlEncoder().encodeToString(serialized);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Lazily initialized registry of all token-types. */
+ private static final class Registry {
+ private static final Map<String, Token.TokenType> BY_ID;
+
+ static {
+ var byId = new HashMap<String, Token.TokenType>();
+ var loader = ServiceLoader.load(Token.TokenType.class);
+ loader.stream()
+ .map(ServiceLoader.Provider::get)
+ .forEach(
+ tokenType -> {
+ var ex = byId.put(tokenType.id(), tokenType);
+ if (ex != null) {
+ throw new IllegalStateException(
+ format("Duplicate token type ID: from %s and %s",
tokenType, ex));
+ }
+ });
+ BY_ID = unmodifiableMap(byId);
+ }
+ }
+
+ /**
+ * Jackson type-id resolver, resolves a {@link Token#getT() token type
value} to a concrete Java
+ * type, consulting the {@link Registry}.
+ */
+ static final class TokenTypeIdResolver extends TypeIdResolverBase {
+ private JavaType baseType;
+
+ public TokenTypeIdResolver() {}
+
+ @Override
+ public void init(JavaType bt) {
+ baseType = bt;
+ }
+
+ @Override
+ public String idFromValue(Object value) {
+ return getId(value);
+ }
+
+ @Override
+ public String idFromValueAndType(Object value, Class<?> suggestedType) {
+ return getId(value);
+ }
+
+ @Override
+ public JsonTypeInfo.Id getMechanism() {
+ return JsonTypeInfo.Id.CUSTOM;
+ }
+
+ private String getId(Object value) {
+ if (value instanceof Token) {
+ return ((Token) value).getT();
+ }
+
+ return null;
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) {
+ var idLower = id.toLowerCase(Locale.ROOT);
+ var asType = Registry.BY_ID.get(idLower);
+ if (asType == null) {
+ throw new IllegalStateException("Cannot deserialize paging token value
of type " + idLower);
+ }
+ if (baseType.getRawClass().isAssignableFrom(asType.javaType())) {
+ return context.constructSpecializedType(baseType, asType.javaType());
+ }
+
+ // This is rather a "test-only" code path, but it might happen in real
life as well, when
+ // calling the ObjectMapper with a "too specific" type and not just
Change.class.
+ // So we can get here for example, if the baseType (induced by the type
passed to
+ // ObjectMapper), is GenericChange.class, but the type is a "well known"
type like
+ // ChangeRename.class.
+ @SuppressWarnings("unchecked")
+ var concrete = (Class<? extends Token>) baseType.getRawClass();
+ return context.constructSpecializedType(baseType, concrete);
+ }
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
deleted file mode 100644
index c8476c351..000000000
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.core.persistence.pagination;
-
-import java.util.List;
-
-/**
- * A {@link PageToken} implementation for readers who want to read everything.
The behavior when
- * using this token should be the same as when reading without a token.
- */
-public class ReadEverythingPageToken extends PageToken {
-
- public static String PREFIX = "read-everything";
-
- public ReadEverythingPageToken() {}
-
- @Override
- public String toTokenString() {
- return PREFIX;
- }
-
- @Override
- protected PageToken updated(List<?> newData) {
- return new DonePageToken();
- }
-}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java
new file mode 100644
index 000000000..34adb9560
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.pagination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
+import org.immutables.value.Value;
+
+/**
+ * Token base interface.
+ *
+ * <p>Concrete token implementations extend this {@link Token} interface and
provide a Java services
+ * registered class that implements {@link Token.TokenType}.
+ *
+ * <p>Serialization property names should be intentionally short to reduce the
size of the
+ * serialized paging token.
+ *
+ * <p>Example:
+ *
+ * {@snippet :
+ * @PolarisImmutable
+ * @JsonSerialize(as = ImmutableExampleToken.class)
+ * @JsonDeserialize(as = ImmutableExampleToken.class)
+ * public interface ExampleToken extends Token {
+ * String ID = "example";
+ *
+ * @Override
+ * default String getT() {
+ * return ID;
+ * }
+ *
+ * @JsonProperty("a")
+ * long a();
+ *
+ * @JsonProperty("b")
+ * String b();
+ *
+ * static ExampleToken newExampleToken(long a, String b) {
+ * return ImmutableExampleToken.builder().a(a).b(b).build();
+ * }
+ *
+ * final class ExampleTokenType implements TokenType {
+ * @Override
+ * public String id() {
+ * return ID;
+ * }
+ *
+ * @Override
+ * public Class<? extends Token> javaType() {
+ * return ExampleToken.class;
+ * }
+ * }
+ * }
+ * }
+ *
+ * plus a resource file {@code
+ *
META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType}
containing
+ * {@code org.apache.polaris.examples.pagetoken.ExampleToken$ExampleTokenType}.
+ */
+@JsonTypeIdResolver(PageTokenUtil.TokenTypeIdResolver.class)
+@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "t", visible = true)
+public interface Token {
+
+ @Value.Redacted
+ @JsonIgnore
+ // must use 'getT' here, otherwise the property won't be properly "wired" to
be the type info and
+ // Jackson (deserialization) fails with
+ // 'com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "t"', if
+ // this property is just named 'String t()'
+ String getT();
+
+ /** Token type specification, referenced via Java's service loader
mechanism. */
+ interface TokenType {
+ /**
+ * ID of the token type, must be equal to the result of {@link
Token#getT()} of the concrete
+ * {@link #javaType() token type}.
+ */
+ String id();
+
+ /** Concrete token type. */
+ Class<? extends Token> javaType();
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index e79dafcf5..8c8e26eb8 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.AsyncTaskType;
@@ -686,8 +687,8 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
}
/**
- * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType,
PolarisEntitySubType,
- * PageToken)}
+ * See {@link PolarisMetaStoreManager#listEntities(PolarisCallContext, List,
PolarisEntityType,
+ * PolarisEntitySubType, PageToken)}
*/
private @Nonnull ListEntitiesResult listEntities(
@Nonnull PolarisCallContext callCtx,
@@ -701,24 +702,25 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
// return if we failed to resolve
if (resolver.isFailure()) {
- return new ListEntitiesResult(
- BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null,
Optional.empty());
+ return new
ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED,
null);
}
- // return list of active entities
- Page<EntityNameLookupRecord> resultPage =
- ms.listEntitiesInCurrentTxn(
- callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(),
entityType, pageToken);
-
+ Predicate<PolarisBaseEntity> filter = entity -> true;
// prune the returned list with only entities matching the entity subtype
if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
- resultPage =
- pageToken.buildNextPage(
- resultPage.items.stream()
- .filter(rec -> rec.getSubTypeCode() ==
entitySubType.getCode())
- .collect(Collectors.toList()));
+ filter = e -> e.getSubTypeCode() == entitySubType.getCode();
}
+ // return list of active entities
+ Page<EntityNameLookupRecord> resultPage =
+ ms.listEntitiesInCurrentTxn(
+ callCtx,
+ resolver.getCatalogIdOrNull(),
+ resolver.getParentId(),
+ entityType,
+ filter,
+ pageToken);
+
// done
return ListEntitiesResult.fromPage(resultPage);
}
@@ -1076,7 +1078,7 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
}
createdEntities.add(entityCreateResult.getEntity());
}
- return new EntitiesResult(createdEntities);
+ return new EntitiesResult(Page.fromItems(createdEntities));
});
}
@@ -1180,7 +1182,7 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
}
// good, all success
- return new EntitiesResult(updatedEntities);
+ return new EntitiesResult(Page.fromItems(updatedEntities));
}
/** {@inheritDoc} */
@@ -1385,7 +1387,7 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
entity -> true,
Function.identity(),
PageToken.fromLimit(2))
- .items;
+ .items();
// if we have 2, we cannot drop the catalog. If only one left, better be
the admin role
if (catalogRoles.size() > 1) {
@@ -1971,36 +1973,42 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
Function.identity(),
pageToken);
- List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
- availableTasks.items.forEach(
- task -> {
- PolarisBaseEntity.Builder updatedTask = new
PolarisBaseEntity.Builder(task);
- Map<String, String> properties =
- PolarisObjectMapperUtil.deserializeProperties(callCtx,
task.getProperties());
- properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID,
executorId);
- properties.put(
- PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
- String.valueOf(callCtx.getClock().millis()));
- properties.put(
- PolarisTaskConstants.ATTEMPT_COUNT,
- String.valueOf(
-
Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT,
"0"))
- + 1));
-
updatedTask.properties(PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
- EntityResult result =
- updateEntityPropertiesIfNotChanged(callCtx, ms, null,
updatedTask.build());
- if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
- loadedTasks.add(result.getEntity());
- } else {
- // TODO: Consider performing incremental leasing of individual
tasks one at a time
- // instead of requiring all-or-none semantics for all the tasks we
think we listed,
- // or else contention could be very bad.
- ms.rollback();
- throw new RetryOnConcurrencyException(
- "Failed to lease available task with status %s, info: %s",
- result.getReturnStatus(), result.getExtraInformation());
- }
- });
+ List<PolarisBaseEntity> loadedTasks =
+ availableTasks.items().stream()
+ .map(
+ task -> {
+ PolarisBaseEntity.Builder updatedTask = new
PolarisBaseEntity.Builder(task);
+ Map<String, String> properties =
+ PolarisObjectMapperUtil.deserializeProperties(callCtx,
task.getProperties());
+
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId);
+ properties.put(
+ PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
+ String.valueOf(callCtx.getClock().millis()));
+ properties.put(
+ PolarisTaskConstants.ATTEMPT_COUNT,
+ String.valueOf(
+ Integer.parseInt(
+
properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0"))
+ + 1));
+ updatedTask.properties(
+ PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
+ EntityResult result =
+ updateEntityPropertiesIfNotChanged(callCtx, ms, null,
updatedTask.build());
+ if (result.getReturnStatus() ==
BaseResult.ReturnStatus.SUCCESS) {
+ return result.getEntity();
+ } else {
+ // TODO: Consider performing incremental leasing of
individual tasks one at a
+ // time
+ // instead of requiring all-or-none semantics for all the
tasks we think we
+ // listed,
+ // or else contention could be very bad.
+ ms.rollback();
+ throw new RetryOnConcurrencyException(
+ "Failed to lease available task with status %s, info:
%s",
+ result.getReturnStatus(),
result.getExtraInformation());
+ }
+ })
+ .collect(Collectors.toList());
return EntitiesResult.fromPage(Page.fromItems(loadedTasks));
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
index 12907b08d..3bc7fd976 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
@@ -21,6 +21,7 @@ package org.apache.polaris.core.persistence.transactional;
import com.google.common.base.Predicates;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
+import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
@@ -44,7 +45,7 @@ import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
-import org.apache.polaris.core.persistence.pagination.HasPageSize;
+import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -367,13 +368,23 @@ public class TreeMapTransactionalPersistenceImpl extends
AbstractTransactionalPe
.map(
nameRecord ->
this.lookupEntityInCurrentTxn(
- callCtx, catalogId, nameRecord.getId(),
entityType.getCode()))
- .filter(entityFilter);
- if (pageToken instanceof HasPageSize) {
- data = data.limit(((HasPageSize) pageToken).getPageSize());
- }
+ callCtx, catalogId, nameRecord.getId(),
entityType.getCode()));
+
+ Predicate<PolarisBaseEntity> tokenFilter =
+ pageToken
+ .valueAs(EntityIdToken.class)
+ .map(
+ entityIdToken -> {
+ var nextId = entityIdToken.entityId();
+ return (Predicate<PolarisBaseEntity>) e -> e.getId() >
nextId;
+ })
+ .orElse(e -> true);
+
+ data =
data.sorted(Comparator.comparingLong(PolarisEntityCore::getId)).filter(tokenFilter);
+
+ data = data.filter(entityFilter);
- return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
+ return Page.mapped(pageToken, data, transformer,
EntityIdToken::fromEntity);
}
/** {@inheritDoc} */
diff --git
a/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
b/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
new file mode 100644
index 000000000..3579dd29b
--- /dev/null
+++
b/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.core.persistence.pagination.EntityIdToken$EntityIdTokenType
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java
similarity index 54%
rename from
polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
rename to
polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java
index 18586446c..5053ac3ce 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java
+++
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java
@@ -18,35 +18,36 @@
*/
package org.apache.polaris.core.persistence.pagination;
-import java.util.List;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
-/**
- * A {@link PageToken} implementation that has a page size, but no start
offset. This can be used to
- * represent a `limit`. When updated, it returns {@link DonePageToken}. As
such it should never be
- * user-facing and doesn't truly paginate.
- */
-public class LimitPageToken extends PageToken implements HasPageSize {
-
- public static final String PREFIX = "limit";
+@PolarisImmutable
+@JsonSerialize(as = ImmutableDummyTestToken.class)
+@JsonDeserialize(as = ImmutableDummyTestToken.class)
+public interface DummyTestToken extends Token {
+ String ID = "test-dummy";
- private final int pageSize;
+ Optional<String> s();
- public LimitPageToken(int pageSize) {
- this.pageSize = pageSize;
- }
+ OptionalInt i();
@Override
- public int getPageSize() {
- return pageSize;
+ default String getT() {
+ return ID;
}
- @Override
- public String toTokenString() {
- return String.format("%s/%d", PREFIX, pageSize);
- }
+ final class DummyTestTokenType implements TokenType {
+ @Override
+ public String id() {
+ return ID;
+ }
- @Override
- protected PageToken updated(List<?> newData) {
- return new DonePageToken();
+ @Override
+ public Class<? extends Token> javaType() {
+ return DummyTestToken.class;
+ }
}
}
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java
new file mode 100644
index 000000000..338bbc53f
--- /dev/null
+++
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.pagination;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import java.util.OptionalInt;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@ExtendWith(SoftAssertionsExtension.class)
+class PageTokenTest {
+ @InjectSoftAssertions SoftAssertions soft;
+
+ @Test
+ public void testReadEverything() {
+ PageToken r = PageToken.readEverything();
+ soft.assertThat(r.paginationRequested()).isFalse();
+ soft.assertThat(r.pageSize()).isEmpty();
+ soft.assertThat(r.value()).isEmpty();
+
+ Page<Integer> pageEverything =
+ Page.mapped(
+ r,
+ Stream.of(1, 2, 3, 4),
+ Function.identity(),
+ i -> i != null ? EntityIdToken.fromEntityId(i) : null);
+ soft.assertThat(pageEverything.encodedResponseToken()).isNull();
+ soft.assertThat(pageEverything.items()).containsExactly(1, 2, 3, 4);
+
+ r = PageToken.build(null, null);
+ soft.assertThat(r.paginationRequested()).isFalse();
+ soft.assertThat(r.pageSize()).isEmpty();
+ soft.assertThat(r.value()).isEmpty();
+ }
+
+ @Test
+ public void testLimit() {
+ PageToken r = PageToken.fromLimit(123);
+ soft.assertThat(r).isEqualTo(PageToken.build(null, 123));
+ soft.assertThat(r.paginationRequested()).isTrue();
+ soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123));
+ soft.assertThat(r.value()).isEmpty();
+ }
+
+ @Test
+ public void testTokenValueForPaging() {
+ PageToken r = PageToken.fromLimit(2);
+ soft.assertThat(r).isEqualTo(PageToken.build(null, 2));
+ Page<Integer> pageMoreData =
+ Page.mapped(
+ r,
+ Stream.of(1, 2, 3, 4),
+ Function.identity(),
+ i -> i != null ? EntityIdToken.fromEntityId(i) : null);
+ soft.assertThat(pageMoreData.encodedResponseToken()).isNotBlank();
+ soft.assertThat(pageMoreData.items()).containsExactly(1, 2);
+
+ // last page (no more data) - number of items is equal to the requested
page size
+ Page<Integer> lastPageSaturated =
+ Page.mapped(
+ r,
+ Stream.of(3, 4),
+ Function.identity(),
+ i -> i != null ? EntityIdToken.fromEntityId(i) : null);
+ // last page (no more data) - next-token must be null
+ soft.assertThat(lastPageSaturated.encodedResponseToken()).isNull();
+ soft.assertThat(lastPageSaturated.items()).containsExactly(3, 4);
+
+ // last page (no more data) - number of items is less than the requested
page size
+ Page<Integer> lastPageNotSaturated =
+ Page.mapped(
+ r,
+ Stream.of(3),
+ Function.identity(),
+ i -> i != null ? EntityIdToken.fromEntityId(i) : null);
+ soft.assertThat(lastPageNotSaturated.encodedResponseToken()).isNull();
+ soft.assertThat(lastPageNotSaturated.items()).containsExactly(3);
+
+ r = PageToken.fromLimit(200);
+ soft.assertThat(r).isEqualTo(PageToken.build(null, 200));
+ Page<Integer> page200 =
+ Page.mapped(
+ r,
+ Stream.of(1, 2, 3, 4),
+ Function.identity(),
+ i -> i != null ? EntityIdToken.fromEntityId(i) : null);
+ soft.assertThat(page200.encodedResponseToken()).isNull();
+ soft.assertThat(page200.items()).containsExactly(1, 2, 3, 4);
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void testDeSer(Integer pageSize, String serializedPageToken,
PageToken expectedPageToken) {
+ soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken,
pageSize))
+ .isEqualTo(expectedPageToken);
+ }
+
+ static Stream<Arguments> testDeSer() {
+ var entity42page123 =
+
ImmutablePageToken.builder().pageSize(123).value(EntityIdToken.fromEntityId(42)).build();
+ var entity42page123ser = PageTokenUtil.serializePageToken(entity42page123);
+ return Stream.of(
+ arguments(null, null, PageToken.readEverything()),
+ arguments(123, null, PageToken.fromLimit(123)),
+ arguments(123, entity42page123ser, entity42page123),
+ arguments(
+ 123,
+ PageTokenUtil.serializePageToken(
+ ImmutablePageToken.builder()
+ .pageSize(999999)
+ .value(EntityIdToken.fromEntityId(42))
+ .build()),
+ entity42page123));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void testApiRoundTrip(Token token) {
+ PageToken request = PageToken.build(null, 123);
+ Page<?> page = Page.mapped(request, Stream.of("i1"), Function.identity(),
x -> token);
+ soft.assertThat(page.encodedResponseToken()).isNotBlank();
+
+ PageToken r = PageToken.build(page.encodedResponseToken(), null);
+ soft.assertThat(r.value()).contains(token);
+ soft.assertThat(r.paginationRequested()).isTrue();
+ soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123));
+
+ r = PageToken.build(page.encodedResponseToken(), 456);
+ soft.assertThat(r.value()).contains(token);
+ soft.assertThat(r.paginationRequested()).isTrue();
+ soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(456));
+ }
+
+ static Stream<Token> testApiRoundTrip() {
+ return Stream.of(
+ EntityIdToken.fromEntityId(123),
+ EntityIdToken.fromEntityId(456),
+ ImmutableDummyTestToken.builder().s("str").i(42).build(),
+ ImmutableDummyTestToken.builder().i(42).build(),
+ ImmutableDummyTestToken.builder().build());
+ }
+}
diff --git
a/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
b/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
new file mode 100644
index 000000000..26778107e
--- /dev/null
+++
b/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.core.persistence.pagination.DummyTestToken$DummyTestTokenType
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index 30f5939ed..905934e67 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -116,6 +116,7 @@ import
org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.cache.InMemoryEntityCache;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
@@ -211,6 +212,8 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
"test",
"polaris.readiness.ignore-severe-issues",
"true",
+ "LIST_PAGINATION_ENABLED",
+ "true",
"polaris.features.\"ALLOW_TABLE_LOCATION_OVERLAP\"",
"true");
}
@@ -2326,4 +2329,131 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld);
Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
}
+
+ private static PageToken nextRequest(Page<?> previousPage) {
+ return PageToken.build(previousPage.encodedResponseToken(), null);
+ }
+
+ @Test
+ public void testPaginatedListTables() {
+ Assumptions.assumeTrue(
+ requiresNamespaceCreate(),
+ "Only applicable if namespaces must be created before adding
children");
+
+ catalog.createNamespace(NS);
+
+ for (int i = 0; i < 5; i++) {
+ catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i),
SCHEMA).create();
+ }
+
+ try {
+ // List without pagination
+ Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5);
+
+ // List with a limit:
+ Page<?> firstListResult = catalog.listTables(NS, PageToken.fromLimit(2));
+ Assertions.assertThat(firstListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the previously obtained token:
+ Page<?> secondListResult = catalog.listTables(NS,
nextRequest(firstListResult));
+ Assertions.assertThat(secondListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the final token:
+ Page<?> finalListResult = catalog.listTables(NS,
nextRequest(secondListResult));
+ Assertions.assertThat(finalListResult.items().size()).isEqualTo(1);
+ Assertions.assertThat(finalListResult.encodedResponseToken()).isNull();
+ } finally {
+ for (int i = 0; i < 5; i++) {
+ catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i));
+ }
+ }
+ }
+
+ @Test
+ public void testPaginatedListViews() {
+ Assumptions.assumeTrue(
+ requiresNamespaceCreate(),
+ "Only applicable if namespaces must be created before adding
children");
+
+ catalog.createNamespace(NS);
+
+ for (int i = 0; i < 5; i++) {
+ catalog
+ .buildView(TableIdentifier.of(NS, "pagination_view_" + i))
+ .withQuery("a_" + i, "SELECT 1 id")
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(NS)
+ .create();
+ }
+
+ try {
+ // List without pagination
+ Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5);
+
+ // List with a limit:
+ Page<?> firstListResult = catalog.listViews(NS, PageToken.fromLimit(2));
+ Assertions.assertThat(firstListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the previously obtained token:
+ Page<?> secondListResult = catalog.listViews(NS,
nextRequest(firstListResult));
+ Assertions.assertThat(secondListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the final token:
+ Page<?> finalListResult = catalog.listViews(NS,
nextRequest(secondListResult));
+ Assertions.assertThat(finalListResult.items().size()).isEqualTo(1);
+ Assertions.assertThat(finalListResult.encodedResponseToken()).isNull();
+ } finally {
+ for (int i = 0; i < 5; i++) {
+ catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i));
+ }
+ }
+ }
+
+ @Test
+ public void testPaginatedListNamespaces() {
+ for (int i = 0; i < 5; i++) {
+ catalog.createNamespace(Namespace.of("pagination_namespace_" + i));
+ }
+
+ try {
+ // List without pagination
+ Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5);
+
+ // List with a limit:
+ Page<?> firstListResult = catalog.listNamespaces(Namespace.empty(),
PageToken.fromLimit(2));
+ Assertions.assertThat(firstListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the previously obtained token:
+ Page<?> secondListResult =
+ catalog.listNamespaces(Namespace.empty(),
nextRequest(firstListResult));
+ Assertions.assertThat(secondListResult.items().size()).isEqualTo(2);
+
Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty();
+
+ // List using the final token:
+ Page<?> finalListResult =
+ catalog.listNamespaces(Namespace.empty(),
nextRequest(secondListResult));
+ Assertions.assertThat(finalListResult.items().size()).isEqualTo(1);
+ Assertions.assertThat(finalListResult.encodedResponseToken()).isNull();
+
+ // List with page size matching the amount of data, no more pages
+ Page<?> firstExactListResult =
+ catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(5));
+ Assertions.assertThat(firstExactListResult.items().size()).isEqualTo(5);
+
Assertions.assertThat(firstExactListResult.encodedResponseToken()).isNull();
+
+ // List with huge page size:
+ Page<?> bigListResult = catalog.listNamespaces(Namespace.empty(),
PageToken.fromLimit(9999));
+ Assertions.assertThat(bigListResult.items().size()).isEqualTo(5);
+ Assertions.assertThat(bigListResult.encodedResponseToken()).isNull();
+ } finally {
+ for (int i = 0; i < 5; i++) {
+ catalog.dropNamespace(Namespace.of("pagination_namespace_" + i));
+ }
+ }
+ }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 1a8e76900..031c3882f 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -458,14 +458,10 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
- return listTables(namespace, PageToken.readEverything()).items;
+ return listTables(namespace, PageToken.readEverything()).items();
}
- public Page<TableIdentifier> listTables(Namespace namespace, String
pageToken, Integer pageSize) {
- return listTables(namespace, buildPageToken(pageToken, pageSize));
- }
-
- private Page<TableIdentifier> listTables(Namespace namespace, PageToken
pageToken) {
+ public Page<TableIdentifier> listTables(Namespace namespace, PageToken
pageToken) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException(
"Cannot list tables for namespace. Namespace does not exist: '%s'",
namespace);
@@ -778,14 +774,10 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<Namespace> listNamespaces(Namespace namespace) throws
NoSuchNamespaceException {
- return listNamespaces(namespace, PageToken.readEverything()).items;
- }
-
- public Page<Namespace> listNamespaces(Namespace namespace, String pageToken,
Integer pageSize) {
- return listNamespaces(namespace, buildPageToken(pageToken, pageSize));
+ return listNamespaces(namespace, PageToken.readEverything()).items();
}
- private Page<Namespace> listNamespaces(Namespace namespace, PageToken
pageToken)
+ public Page<Namespace> listNamespaces(Namespace namespace, PageToken
pageToken)
throws NoSuchNamespaceException {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getResolvedPath(namespace);
if (resolvedEntities == null) {
@@ -801,13 +793,12 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
PolarisEntityType.NAMESPACE,
PolarisEntitySubType.NULL_SUBTYPE,
pageToken);
- List<PolarisEntity.NameAndId> entities =
- PolarisEntity.toNameAndIdList(listResult.getEntities());
- List<Namespace> namespaces =
PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities);
return listResult
- .getPageToken()
- .map(token -> new Page<>(token, namespaces))
- .orElseGet(() -> Page.fromItems(namespaces));
+ .getPage()
+ .map(
+ record ->
+ PolarisCatalogHelpers.nameAndIdToNamespace(
+ catalogPath, new PolarisEntity.NameAndId(record.getName(),
record.getId())));
}
@Override
@@ -819,14 +810,10 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
@Override
public List<TableIdentifier> listViews(Namespace namespace) {
- return listViews(namespace, PageToken.readEverything()).items;
+ return listViews(namespace, PageToken.readEverything()).items();
}
- public Page<TableIdentifier> listViews(Namespace namespace, String
pageToken, Integer pageSize) {
- return listViews(namespace, buildPageToken(pageToken, pageSize));
- }
-
- private Page<TableIdentifier> listViews(Namespace namespace, PageToken
pageToken) {
+ public Page<TableIdentifier> listViews(Namespace namespace, PageToken
pageToken) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException(
"Cannot list views for namespace. Namespace does not exist: '%s'",
namespace);
@@ -2596,15 +2583,11 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
PolarisEntityType.TABLE_LIKE,
subType,
pageToken);
- List<PolarisEntity.NameAndId> entities =
- PolarisEntity.toNameAndIdList(listResult.getEntities());
- List<TableIdentifier> identifiers =
- PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath,
entities);
+ Namespace parentNamespace =
PolarisCatalogHelpers.parentNamespace(catalogPath);
return listResult
- .getPageToken()
- .map(token -> new Page<>(token, identifiers))
- .orElseGet(() -> Page.fromItems(identifiers));
+ .getPage()
+ .map(record -> TableIdentifier.of(parentNamespace, record.getName()));
}
/**
@@ -2642,18 +2625,4 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
.getRealmConfig()
.getConfig(FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES);
}
-
- /** Build a {@link PageToken} from a string and page size. */
- private PageToken buildPageToken(@Nullable String tokenString, @Nullable
Integer pageSize) {
-
- boolean paginationEnabled =
- callContext
- .getRealmConfig()
- .getConfig(FeatureConfiguration.LIST_PAGINATION_ENABLED,
catalogEntity);
- if (!paginationEnabled) {
- return PageToken.readEverything();
- } else {
- return PageToken.build(tokenString, pageSize);
- }
- }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index c0b0b3259..d8ceea08d 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -91,6 +91,7 @@ import
org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager;
import org.apache.polaris.core.persistence.dao.entity.EntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -183,10 +184,11 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
authorizeBasicNamespaceOperationOrThrow(op, parent);
if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
- Page<Namespace> results = polarisCatalog.listNamespaces(parent,
pageToken, pageSize);
+ PageToken pageRequest = PageToken.build(pageToken, pageSize);
+ Page<Namespace> results = polarisCatalog.listNamespaces(parent,
pageRequest);
return ListNamespacesResponse.builder()
- .addAll(results.items)
- .nextPageToken(results.pageToken.toTokenString())
+ .addAll(results.items())
+ .nextPageToken(results.encodedResponseToken())
.build();
} else {
return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent,
pageToken, pageSize);
@@ -343,10 +345,11 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
authorizeBasicNamespaceOperationOrThrow(op, namespace);
if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
- Page<TableIdentifier> results = polarisCatalog.listTables(namespace,
pageToken, pageSize);
+ PageToken pageRequest = PageToken.build(pageToken, pageSize);
+ Page<TableIdentifier> results = polarisCatalog.listTables(namespace,
pageRequest);
return ListTablesResponse.builder()
- .addAll(results.items)
- .nextPageToken(results.pageToken.toTokenString())
+ .addAll(results.items())
+ .nextPageToken(results.encodedResponseToken())
.build();
} else {
return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken,
pageSize);
@@ -1005,10 +1008,11 @@ public class IcebergCatalogHandler extends
CatalogHandler implements AutoCloseab
authorizeBasicNamespaceOperationOrThrow(op, namespace);
if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
- Page<TableIdentifier> results = polarisCatalog.listViews(namespace,
pageToken, pageSize);
+ PageToken pageRequest = PageToken.build(pageToken, pageSize);
+ Page<TableIdentifier> results = polarisCatalog.listViews(namespace,
pageRequest);
return ListTablesResponse.builder()
- .addAll(results.items)
- .nextPageToken(results.pageToken.toTokenString())
+ .addAll(results.items())
+ .nextPageToken(results.encodedResponseToken())
.build();
} else if (baseCatalog instanceof ViewCatalog viewCatalog) {
return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken,
pageSize);
diff --git
a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
deleted file mode 100644
index 97e52fb84..000000000
---
a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.persistence.pagination;
-
-import org.apache.polaris.core.persistence.pagination.DonePageToken;
-import org.apache.polaris.core.persistence.pagination.HasPageSize;
-import org.apache.polaris.core.persistence.pagination.PageToken;
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PageTokenTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PageTokenTest.class);
-
- @Test
- void testDoneToken() {
- Assertions.assertThat(new DonePageToken()).doesNotReturn(null,
PageToken::toString);
- Assertions.assertThat(new DonePageToken()).returns(null,
PageToken::toTokenString);
- Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken());
- Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new
DonePageToken().hashCode());
- }
-
- @Test
- void testReadEverythingPageToken() {
- PageToken token = PageToken.readEverything();
-
- Assertions.assertThat(token.toString()).isNotNull();
- Assertions.assertThat(token.toTokenString()).isNotNull();
- Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class);
-
-
Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything());
- }
-}