This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 72d5fd66c5 Core: Freshness-aware table loading in REST catalog (#14398)
72d5fd66c5 is described below
commit 72d5fd66c527c27c79880e051b7eaf46deb2fed6
Author: gaborkaszab <[email protected]>
AuthorDate: Sun Jan 18 08:46:08 2026 +0100
Core: Freshness-aware table loading in REST catalog (#14398)
This is the client-side improvement for the freshness-aware table
loading in REST catalog. The main design is the following:
- REST server can send an ETag with the LoadTableResponse
- The client can use this ETag to populate the IF_NONE_MATCH header
with the next loadTable request
- The server can send a 304-NOT_MODIFIED response without a body if
the table has not been changed based on the ETag
- The client when receives a 304, then returns the latest table
object associated with the ETag from cache
---
.../java/org/apache/iceberg/rest/HTTPClient.java | 10 +
.../java/org/apache/iceberg/rest/RESTCatalog.java | 4 +
.../apache/iceberg/rest/RESTCatalogProperties.java | 11 +
.../apache/iceberg/rest/RESTSessionCatalog.java | 168 ++++--
.../org/apache/iceberg/rest/RESTTableCache.java | 129 ++++
.../org/apache/iceberg/rest/TestRESTCatalog.java | 657 +++++++++++++++++++--
.../apache/iceberg/rest/TestRESTScanPlanning.java | 20 +
.../apache/iceberg/rest/TestableRESTCatalog.java | 52 ++
.../iceberg/rest/TestableRESTSessionCatalog.java | 44 ++
9 files changed, 1026 insertions(+), 69 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index b30caf1c7d..2fd3ec882a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -326,6 +326,16 @@ public class HTTPClient extends BaseHTTPClient {
// Skip parsing the response stream for any successful request not
expecting a response body
if (emptyBody(response, responseType)) {
+ if (response.getCode() == HttpStatus.SC_NOT_MODIFIED
+ && !req.headers().contains(HttpHeaders.IF_NONE_MATCH)) {
+ // 304-NOT_MODIFIED is used for freshness-aware loading and requires
an ETag sent to the
+ // server via IF_NONE_MATCH header in the request. If no ETag was
sent, we shouldn't
+ // receive a 304.
+ throw new RESTException(
+ "Invalid (NOT_MODIFIED) response for request: method=%s,
path=%s",
+ req.method(), req.path());
+ }
+
return null;
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index aff8832c6b..f4c75d1050 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -96,6 +96,10 @@ public class RESTCatalog
sessionCatalog.initialize(name, props);
}
+ protected RESTSessionCatalog sessionCatalog() {
+ return sessionCatalog;
+ }
+
@Override
public String name() {
return sessionCatalog.name();
diff --git
a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
index 72b09fa772..e294bcfebe 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.rest;
+import java.util.concurrent.TimeUnit;
+
public final class RESTCatalogProperties {
private RESTCatalogProperties() {}
@@ -43,6 +45,15 @@ public final class RESTCatalogProperties {
public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id";
+ // Properties that control the behaviour of the table cache used for
freshness-aware table
+ // loading.
+ public static final String TABLE_CACHE_EXPIRE_AFTER_WRITE_MS =
+ "rest-table-cache.expire-after-write-ms";
+ public static final long TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+
+ public static final String TABLE_CACHE_MAX_ENTRIES =
"rest-table-cache.max-entries";
+ public static final int TABLE_CACHE_MAX_ENTRIES_DEFAULT = 100;
+
public enum SnapshotMode {
ALL,
REFS
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 4ff13ac824..0c4f8a39bf 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -26,9 +26,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.hc.core5.http.HttpHeaders;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -60,6 +62,7 @@ import org.apache.iceberg.io.FileIOTracker;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -67,6 +70,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.RESTTableCache.TableWithETag;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
@@ -165,6 +169,8 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
private Supplier<Map<String, String>> mutationHeaders = Map::of;
private String namespaceSeparator = null;
+ private RESTTableCache tableCache;
+
public RESTSessionCatalog() {
this(
config ->
@@ -277,9 +283,22 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
mergedProps,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT);
+
+ this.tableCache = createTableCache(mergedProps);
+ this.closeables.addCloseable(this.tableCache);
+
super.initialize(name, mergedProps);
}
+ protected RESTTableCache createTableCache(Map<String, String> props) {
+ return new RESTTableCache(props);
+ }
+
+ @VisibleForTesting
+ RESTTableCache tableCache() {
+ return tableCache;
+ }
+
@Override
public void setConf(Object newConf) {
this.conf = newConf;
@@ -332,6 +351,8 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
return true;
} catch (NoSuchTableException e) {
return false;
+ } finally {
+ invalidateTable(context, identifier);
}
}
@@ -353,6 +374,8 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
return true;
} catch (NoSuchTableException e) {
return false;
+ } finally {
+ invalidateTable(context, identifier);
}
}
@@ -370,6 +393,8 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
client
.withAuthSession(contextualSession)
.post(paths.rename(), request, null, mutationHeaders,
ErrorHandlers.tableErrorHandler());
+
+ invalidateTable(context, from);
}
@Override
@@ -384,9 +409,15 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
return true;
} else {
// fallback in order to work with 1.7.x and older servers
- return super.tableExists(context, identifier);
+ if (!super.tableExists(context, identifier)) {
+ invalidateTable(context, identifier);
+ return false;
+ }
+
+ return true;
}
} catch (NoSuchTableException e) {
+ invalidateTable(context, identifier);
return false;
}
}
@@ -396,7 +427,11 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
}
private LoadTableResponse loadInternal(
- SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
+ SessionContext context,
+ TableIdentifier identifier,
+ SnapshotMode mode,
+ Map<String, String> headers,
+ Consumer<Map<String, String>> responseHeaders) {
Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE);
AuthSession contextualSession = authManager.contextualSession(context,
catalogAuth);
return client
@@ -405,8 +440,9 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
paths.table(identifier),
snapshotModeToParam(mode),
LoadTableResponse.class,
- Map.of(),
- ErrorHandlers.tableErrorHandler());
+ headers,
+ ErrorHandlers.tableErrorHandler(),
+ responseHeaders);
}
@Override
@@ -424,8 +460,25 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
MetadataTableType metadataType;
LoadTableResponse response;
TableIdentifier loadedIdent;
+
+ Map<String, String> responseHeaders = Maps.newHashMap();
+ TableWithETag cachedTable = tableCache.getIfPresent(context.sessionId(),
identifier);
+
try {
- response = loadInternal(context, identifier, snapshotMode);
+ response =
+ loadInternal(
+ context,
+ identifier,
+ snapshotMode,
+ headersForLoadTable(cachedTable),
+ responseHeaders::putAll);
+
+ if (response == null) {
+ Preconditions.checkNotNull(cachedTable, "Invalid load table response:
null");
+
+ return cachedTable.supplier().get();
+ }
+
loadedIdent = identifier;
metadataType = null;
@@ -435,14 +488,33 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
// attempt to load a metadata table using the identifier's namespace
as the base table
TableIdentifier baseIdent =
TableIdentifier.of(identifier.namespace().levels());
try {
- response = loadInternal(context, baseIdent, snapshotMode);
+ responseHeaders.clear();
+ cachedTable = tableCache.getIfPresent(context.sessionId(),
baseIdent);
+
+ response =
+ loadInternal(
+ context,
+ baseIdent,
+ snapshotMode,
+ headersForLoadTable(cachedTable),
+ responseHeaders::putAll);
+
+ if (response == null) {
+ Preconditions.checkNotNull(cachedTable, "Invalid load table
response: null");
+
+ return MetadataTableUtils.createMetadataTableInstance(
+ cachedTable.supplier().get(), metadataType);
+ }
+
loadedIdent = baseIdent;
} catch (NoSuchTableException ignored) {
// the base table does not exist
+ invalidateTable(context, baseIdent);
throw original;
}
} else {
// name is not a metadata table
+ invalidateTable(context, identifier);
throw original;
}
}
@@ -461,7 +533,7 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
.setPreviousFileLocation(null)
.setSnapshotsSupplier(
() ->
- loadInternal(context, finalIdentifier, SnapshotMode.ALL)
+ loadInternal(context, finalIdentifier, SnapshotMode.ALL,
Map.of(), h -> {})
.tableMetadata()
.snapshots())
.discardChanges()
@@ -470,38 +542,52 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
tableMetadata = response.tableMetadata();
}
+ List<Credential> credentials = response.credentials();
RESTClient tableClient = client.withAuthSession(tableSession);
- RESTTableOperations ops =
- newTableOps(
- tableClient,
- paths.table(finalIdentifier),
- Map::of,
- mutationHeaders,
- tableFileIO(context, tableConf, response.credentials()),
- tableMetadata,
- endpoints);
+ Supplier<BaseTable> tableSupplier =
+ createTableSupplier(
+ finalIdentifier, tableMetadata, context, tableClient, tableConf,
credentials);
- trackFileIO(ops);
-
- // RestTable should only be returned for non-metadata tables, because
client would
- // not have access to metadata files for example manifests, since all it
needs is catalog.
- if (metadataType == null) {
- RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier,
tableClient);
- if (restTable != null) {
- return restTable;
- }
+ String eTag = responseHeaders.getOrDefault(HttpHeaders.ETAG, null);
+ if (eTag != null) {
+ tableCache.put(context.sessionId(), finalIdentifier, tableSupplier,
eTag);
}
- BaseTable table =
- new BaseTable(
- ops,
- fullTableName(finalIdentifier),
- metricsReporter(paths.metrics(finalIdentifier), tableClient));
if (metadataType != null) {
- return MetadataTableUtils.createMetadataTableInstance(table,
metadataType);
+ return
MetadataTableUtils.createMetadataTableInstance(tableSupplier.get(),
metadataType);
}
- return table;
+ return tableSupplier.get();
+ }
+
+ private Supplier<BaseTable> createTableSupplier(
+ TableIdentifier identifier,
+ TableMetadata tableMetadata,
+ SessionContext context,
+ RESTClient tableClient,
+ Map<String, String> tableConf,
+ List<Credential> credentials) {
+ return () -> {
+ RESTTableOperations ops =
+ newTableOps(
+ tableClient,
+ paths.table(identifier),
+ Map::of,
+ mutationHeaders,
+ tableFileIO(context, tableConf, credentials),
+ tableMetadata,
+ endpoints);
+
+ trackFileIO(ops);
+
+ RESTTable table = restTableForScanPlanning(ops, identifier, tableClient);
+ if (table != null) {
+ return table;
+ }
+
+ return new BaseTable(
+ ops, fullTableName(identifier),
metricsReporter(paths.metrics(identifier), tableClient));
+ };
}
private RESTTable restTableForScanPlanning(
@@ -546,7 +632,9 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
}
@Override
- public void invalidateTable(SessionContext context, TableIdentifier ident) {}
+ public void invalidateTable(SessionContext context, TableIdentifier ident) {
+ tableCache.invalidate(context.sessionId(), ident);
+ }
@Override
public Table registerTable(
@@ -906,7 +994,8 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
throw new AlreadyExistsException("View with same name already exists:
%s", ident);
}
- LoadTableResponse response = loadInternal(context, ident, snapshotMode);
+ LoadTableResponse response = loadInternal(context, ident, snapshotMode,
Map.of(), h -> {});
+
String fullName = fullTableName(ident);
Map<String, String> tableConf = response.config();
@@ -1368,6 +1457,17 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
.post(paths.renameView(), request, null, mutationHeaders,
ErrorHandlers.viewErrorHandler());
}
+ private static Map<String, String> headersForLoadTable(TableWithETag
tableWithETag) {
+ if (tableWithETag == null) {
+ return Map.of();
+ }
+
+ String eTag = tableWithETag.eTag();
+ Preconditions.checkArgument(eTag != null, "Invalid ETag: null");
+
+ return Map.of(HttpHeaders.IF_NONE_MATCH, eTag);
+ }
+
private class RESTViewBuilder implements ViewBuilder {
private final SessionContext context;
private final TableIdentifier identifier;
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java
new file mode 100644
index 0000000000..912173e9eb
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.catalog.TableIdentifier;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.immutables.value.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class RESTTableCache implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RESTTableCache.class);
+
+ private final Cache<SessionIdTableId, TableWithETag> tableCache;
+
+ RESTTableCache(Map<String, String> props) {
+ this(props, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ RESTTableCache(Map<String, String> props, Ticker ticker) {
+ long expireAfterWriteMS =
+ PropertyUtil.propertyAsLong(
+ props,
+ RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS,
+ RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT);
+ Preconditions.checkArgument(
+ expireAfterWriteMS > 0, "Invalid expire after write: zero or
negative");
+
+ long numEntries =
+ PropertyUtil.propertyAsLong(
+ props,
+ RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES,
+ RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES_DEFAULT);
+ Preconditions.checkArgument(numEntries >= 0, "Invalid max entries:
negative");
+
+ tableCache =
+ Caffeine.newBuilder()
+ .maximumSize(numEntries)
+ .expireAfterWrite(Duration.ofMillis(expireAfterWriteMS))
+ .removalListener(
+ (compositeKey, table, cause) ->
+ LOG.debug("Evicted {} from table cache ({})",
compositeKey, cause))
+ .recordStats()
+ .ticker(ticker)
+ .build();
+ }
+
+ public TableWithETag getIfPresent(String sessionId, TableIdentifier
identifier) {
+ SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier);
+ return tableCache.getIfPresent(cacheKey);
+ }
+
+ public void put(
+ String sessionId,
+ TableIdentifier identifier,
+ Supplier<BaseTable> tableSupplier,
+ String eTag) {
+ tableCache.put(
+ SessionIdTableId.of(sessionId, identifier),
TableWithETag.of(tableSupplier, eTag));
+ }
+
+ public void invalidate(String sessionId, TableIdentifier identifier) {
+ SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier);
+ tableCache.invalidate(cacheKey);
+ }
+
+ @VisibleForTesting
+ Cache<SessionIdTableId, TableWithETag> cache() {
+ return tableCache;
+ }
+
+ @Override
+ public void close() {
+ tableCache.invalidateAll();
+ tableCache.cleanUp();
+ }
+
+ @Value.Immutable
+ interface SessionIdTableId {
+ String sessionId();
+
+ TableIdentifier tableIdentifier();
+
+ static SessionIdTableId of(String sessionId, TableIdentifier ident) {
+ return ImmutableSessionIdTableId.builder()
+ .sessionId(sessionId)
+ .tableIdentifier(ident)
+ .build();
+ }
+ }
+
+ @Value.Immutable
+ interface TableWithETag {
+ Supplier<BaseTable> supplier();
+
+ String eTag();
+
+ static TableWithETag of(Supplier<BaseTable> tableSupplier, String eTag) {
+ return
ImmutableTableWithETag.builder().supplier(tableSupplier).eTag(eTag).build();
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 40dc050311..6b981c493d 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -33,11 +33,13 @@ import static org.mockito.Mockito.verify;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -55,6 +57,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.HttpHeaders;
+import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.CatalogProperties;
@@ -79,6 +82,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.RESTException;
@@ -93,6 +97,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.RESTTableCache.SessionIdTableId;
+import org.apache.iceberg.rest.RESTTableCache.TableWithETag;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
@@ -109,6 +115,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.FakeTicker;
import org.apache.iceberg.util.Pair;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
@@ -133,6 +140,16 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
ImmutableMap.of(
RESTCatalogProperties.NAMESPACE_SEPARATOR,
RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
+ private static final Duration TABLE_EXPIRATION =
+
Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT);
+ private static final Duration HALF_OF_TABLE_EXPIRATION =
TABLE_EXPIRATION.dividedBy(2);
+
+ private static final SessionCatalog.SessionContext DEFAULT_SESSION_CONTEXT =
+ new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(),
+ "user",
+ ImmutableMap.of("credential", "user:12345"),
+ ImmutableMap.of());
private static final class IdempotentEnv {
private final TableIdentifier ident;
@@ -306,16 +323,10 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
@Override
protected RESTCatalog initCatalog(String catalogName, Map<String, String>
additionalProperties) {
Configuration conf = new Configuration();
- SessionCatalog.SessionContext context =
- new SessionCatalog.SessionContext(
- UUID.randomUUID().toString(),
- "user",
- ImmutableMap.of("credential", "user:12345"),
- ImmutableMap.of());
RESTCatalog catalog =
new RESTCatalog(
- context,
+ DEFAULT_SESSION_CONTEXT,
(config) ->
HTTPClient.builder(config)
.uri(config.get(CatalogProperties.URI))
@@ -3009,12 +3020,13 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag);
}
- @SuppressWarnings("checkstyle:AssertThatThrownByWithMessageCheck")
@Test
public void testNotModified() {
catalog().createNamespace(TABLE.namespace());
- Table table = catalog().createTable(TABLE, SCHEMA);
+ catalog().createTable(TABLE, SCHEMA);
+
+ Table table = catalog().loadTable(TABLE);
String eTag =
ETagProvider.of(((BaseTable)
table).operations().current().metadataFileLocation());
@@ -3023,26 +3035,21 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
invocation -> {
HTTPRequest originalRequest = invocation.getArgument(0);
- HTTPHeaders extendedHeaders =
- ImmutableHTTPHeaders.copyOf(originalRequest.headers())
- .putIfAbsent(
- ImmutableHTTPHeader.builder()
- .name(HttpHeaders.IF_NONE_MATCH)
- .value(eTag)
- .build());
-
- ImmutableHTTPRequest extendedRequest =
- ImmutableHTTPRequest.builder()
- .from(originalRequest)
- .headers(extendedHeaders)
- .build();
-
- return adapterForRESTServer.execute(
- extendedRequest,
- LoadTableResponse.class,
- invocation.getArgument(2),
- invocation.getArgument(3),
- ParserContext.builder().build());
+
assertThat(originalRequest.headers().contains(HttpHeaders.IF_NONE_MATCH));
+ assertThat(
+
originalRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH).get().value())
+ .isEqualTo(eTag);
+
+ assertThat(
+ adapterForRESTServer.execute(
+ originalRequest,
+ LoadTableResponse.class,
+ invocation.getArgument(2),
+ invocation.getArgument(3),
+ ParserContext.builder().build()))
+ .isNull();
+
+ return null;
})
.when(adapterForRESTServer)
.execute(
@@ -3051,17 +3058,14 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
any(),
any());
- // TODO: This won't throw when client side of freshness-aware loading is
implemented
- assertThatThrownBy(() ->
catalog().loadTable(TABLE)).isInstanceOf(NullPointerException.class);
+ catalog().loadTable(TABLE);
TableIdentifier metadataTableIdentifier =
TableIdentifier.of(NS.toString(), TABLE.name(), "partitions");
- // TODO: This won't throw when client side of freshness-aware loading is
implemented
- assertThatThrownBy(() -> catalog().loadTable(metadataTableIdentifier))
- .isInstanceOf(NullPointerException.class);
+ catalog().loadTable(metadataTableIdentifier);
- Mockito.verify(adapterForRESTServer, times(2))
+ Mockito.verify(adapterForRESTServer, times(3))
.execute(
reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
eq(LoadTableResponse.class),
@@ -3378,6 +3382,60 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
}
}
+ @Test
+ public void testCustomTableOperationsWithFreshnessAwareLoading() {
+ class CustomTableOps extends RESTTableOperations {
+ CustomTableOps(
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> readHeaders,
+ Supplier<Map<String, String>> mutationHeaders,
+ FileIO io,
+ TableMetadata current,
+ Set<Endpoint> endpoints) {
+ super(client, path, readHeaders, mutationHeaders, io, current,
endpoints);
+ }
+ }
+
+ class CustomRESTSessionCatalog extends RESTSessionCatalog {
+ CustomRESTSessionCatalog(
+ Function<Map<String, String>, RESTClient> clientBuilder,
+ BiFunction<SessionCatalog.SessionContext, Map<String, String>,
FileIO> ioBuilder) {
+ super(clientBuilder, ioBuilder);
+ }
+
+ @Override
+ protected RESTTableOperations newTableOps(
+ RESTClient restClient,
+ String path,
+ Supplier<Map<String, String>> readHeaders,
+ Supplier<Map<String, String>> mutationHeaders,
+ FileIO fileIO,
+ TableMetadata current,
+ Set<Endpoint> supportedEndpoints) {
+ return new CustomTableOps(
+ restClient, path, readHeaders, mutationHeaders, fileIO, current,
supportedEndpoints);
+ }
+ }
+
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+ RESTCatalog catalog =
+ catalog(adapter, clientBuilder -> new
CustomRESTSessionCatalog(clientBuilder, null));
+
+ catalog.createNamespace(NS);
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ expectFullTableLoadForLoadTable(TABLE, adapter);
+ BaseTable table = (BaseTable) catalog.loadTable(TABLE);
+ assertThat(table.operations()).isInstanceOf(CustomTableOps.class);
+
+ // When answering loadTable from table cache we still get the injected ops.
+ expectNotModifiedResponseForLoadTable(TABLE, adapter);
+ table = (BaseTable) catalog.loadTable(TABLE);
+ assertThat(table.operations()).isInstanceOf(CustomTableOps.class);
+ }
+
@Test
public void testClientAutoSendsIdempotencyWhenServerAdvertises() {
ConfigResponse cfgWithIdem =
@@ -3778,6 +3836,496 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
.hasMessageContaining("No in-memory file found for location: " +
metadataFileLocation);
}
+ @Test
+ public void testInvalidTableCacheParameters() {
+ RESTCatalog catalog = new RESTCatalog(config -> new
RESTCatalogAdapter(backendCatalog));
+
+ assertThatThrownBy(
+ () ->
+ catalog.initialize(
+ "test",
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid expire after write: zero or negative");
+
+ assertThatThrownBy(
+ () ->
+ catalog.initialize(
+ "test",
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid expire after write: zero or negative");
+
+ assertThatThrownBy(
+ () ->
+ catalog.initialize(
+ "test",
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid max entries: negative");
+ }
+
+ @Test
+ public void testFreshnessAwareLoading() {
+ catalog().createNamespace(TABLE.namespace());
+
+ catalog().createTable(TABLE, SCHEMA);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
+ restCatalog.sessionCatalog().tableCache().cache();
+ assertThat(tableCache.estimatedSize()).isZero();
+
+ expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+ BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+ assertThat(tableCache.stats().hitCount()).isZero();
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+ BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+ assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+ assertThat(tableAfterFirstLoad.operations().current().location())
+ .isEqualTo(tableAfterSecondLoad.operations().current().location());
+ assertThat(
+ tableCache
+ .asMap()
+ .get(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE))
+ .supplier()
+ .get()
+ .operations()
+ .current()
+ .metadataFileLocation())
+
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+ Mockito.verify(adapterForRESTServer, times(2))
+ .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
any(), any(), any());
+ }
+
+ @Test
+ public void testFreshnessAwareLoadingMetadataTables() {
+ catalog().createNamespace(TABLE.namespace());
+
+ catalog().createTable(TABLE, SCHEMA);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
+ restCatalog.sessionCatalog().tableCache().cache();
+ assertThat(tableCache.estimatedSize()).isZero();
+
+ BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+ assertThat(tableCache.stats().hitCount()).isZero();
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ TableIdentifier metadataTableIdentifier =
+ TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(),
"partitions");
+
+ BaseMetadataTable metadataTable =
+ (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+ assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ assertThat(table).isNotEqualTo(metadataTable.table());
+ assertThat(table.operations().current().metadataFileLocation())
+
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+ ResourcePaths paths =
+ ResourcePaths.forCatalogProperties(
+ ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E"));
+
+ Mockito.verify(adapterForRESTServer, times(2))
+ .execute(reqMatcher(HTTPMethod.GET, paths.table(TABLE)), any(), any(),
any());
+
+ Mockito.verify(adapterForRESTServer)
+ .execute(
+ reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)),
any(), any(), any());
+ }
+
+ @Test
+ public void testRenameTableInvalidatesTable() {
+ runTableInvalidationTest(
+ restCatalog,
+ adapterForRESTServer,
+ (catalog) ->
+ catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(),
"other_table")),
+ 0);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDropTableInvalidatesTable(boolean purge) {
+ runTableInvalidationTest(
+ restCatalog, adapterForRESTServer, (catalog) ->
catalog.dropTable(TABLE, purge), 0);
+ }
+
+ @Test
+ public void testTableExistViaHeadRequestInvalidatesTable() {
+ runTableInvalidationTest(
+ restCatalog,
+ adapterForRESTServer,
+ ((catalog) -> {
+ // Use a different catalog to drop the table
+ catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE,
true);
+
+ // The main catalog still has the table in cache
+ assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ catalog.tableExists(TABLE);
+ }),
+ 0);
+ }
+
+ @Test
+ public void testTableExistViaGetRequestInvalidatesTable() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ // Configure REST server to answer tableExists query via GET
+ Mockito.doAnswer(
+ invocation ->
+ ConfigResponse.builder()
+ .withEndpoints(
+ ImmutableList.of(
+ Endpoint.V1_LOAD_TABLE,
+ Endpoint.V1_CREATE_NAMESPACE,
+ Endpoint.V1_CREATE_TABLE))
+ .build())
+ .when(adapter)
+ .execute(
+ reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+ eq(ConfigResponse.class),
+ any(),
+ any());
+
+ RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config ->
adapter);
+ catalog.initialize(
+ "catalog",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+ runTableInvalidationTest(
+ catalog,
+ adapter,
+ (cat) -> {
+ // Use a different catalog to drop the table
+ catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE,
true);
+
+ // The main catalog still has the table in cache
+ assertThat(cat.sessionCatalog().tableCache().cache().asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ cat.tableExists(TABLE);
+ },
+ 1);
+ }
+
+ @Test
+ public void testLoadTableInvalidatesCache() {
+ runTableInvalidationTest(
+ restCatalog,
+ adapterForRESTServer,
+ (catalog) -> {
+ // Use a different catalog to drop the table
+ catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE,
true);
+
+ // The main catalog still has the table in cache
+ assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ assertThatThrownBy(() -> catalog.loadTable(TABLE))
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessage("Table does not exist: %s", TABLE);
+ },
+ 1);
+ }
+
+ @Test
+ public void testLoadTableWithMetadataTableNameInvalidatesCache() {
+ TableIdentifier metadataTableIdentifier =
+ TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(),
"partitions");
+
+ runTableInvalidationTest(
+ restCatalog,
+ adapterForRESTServer,
+ (catalog) -> {
+ // Use a different catalog to drop the table
+ catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE,
true);
+
+ // The main catalog still has the table in cache
+ assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier))
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessage("Table does not exist: %s", TABLE);
+ },
+ 1);
+
+ ResourcePaths paths =
+ ResourcePaths.forCatalogProperties(
+ ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E"));
+
+ Mockito.verify(adapterForRESTServer)
+ .execute(
+ reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)),
any(), any(), any());
+ }
+
+ private void runTableInvalidationTest(
+ RESTCatalog catalog,
+ RESTCatalogAdapter adapterToVerify,
+ Consumer<RESTCatalog> action,
+ int loadTableCountFromAction) {
+ catalog.createNamespace(TABLE.namespace());
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
+ catalog.sessionCatalog().tableCache().cache();
+ assertThat(tableCache.stats().hitCount()).isZero();
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ action.accept(catalog);
+
+ // Check that 'action' invalidates cache
+ assertThat(tableCache.estimatedSize()).isZero();
+
+ assertThatThrownBy(() -> catalog.loadTable(TABLE))
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessageContaining("Table does not exist: %s", TABLE);
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ expectFullTableLoadForLoadTable(TABLE, adapterToVerify);
+
+ BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE);
+
+
assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction);
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ assertThat(newTableWithSameName).isNotEqualTo(originalTable);
+
assertThat(newTableWithSameName.operations().current().metadataFileLocation())
+
.isNotEqualTo(originalTable.operations().current().metadataFileLocation());
+
+ Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction))
+ .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
any(), any(), any());
+ }
+
+ @Test
+ public void testTableCacheWithMultiSessions() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config ->
adapter, null);
+ sessionCatalog.initialize("test_session_catalog", Map.of());
+
+ SessionCatalog.SessionContext otherSessionContext =
+ new SessionCatalog.SessionContext(
+ "session_id_2", "user", ImmutableMap.of("credential",
"user:12345"), ImmutableMap.of());
+
+ sessionCatalog.createNamespace(DEFAULT_SESSION_CONTEXT, TABLE.namespace());
+
+ sessionCatalog.buildTable(DEFAULT_SESSION_CONTEXT, TABLE, SCHEMA).create();
+
+ expectFullTableLoadForLoadTable(TABLE, adapter);
+
+ sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
sessionCatalog.tableCache().cache();
+ assertThat(tableCache.stats().hitCount()).isZero();
+ assertThat(tableCache.asMap())
+
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(),
TABLE));
+
+ expectFullTableLoadForLoadTable(TABLE, adapter);
+
+ sessionCatalog.loadTable(otherSessionContext, TABLE);
+
+ assertThat(tableCache.asMap())
+ .containsOnlyKeys(
+ SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE),
+ SessionIdTableId.of(otherSessionContext.sessionId(), TABLE));
+ }
+
+ @Test
+ public void test304NotModifiedResponseWithEmptyTableCache() {
+ Mockito.doAnswer(invocation -> null)
+ .when(adapterForRESTServer)
+ .execute(
+ reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ catalog().createNamespace(TABLE.namespace());
+
+ catalog().createTable(TABLE, SCHEMA);
+
+ catalog().invalidateTable(TABLE);
+
+ // Table is not in the cache and null LoadTableResponse is received
+ assertThatThrownBy(() -> catalog().loadTable(TABLE))
+ .isInstanceOf(RESTException.class)
+ .hasMessage(
+ "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s",
+ HTTPMethod.GET, RESOURCE_PATHS.table(TABLE));
+ }
+
+ @Test
+ public void testTableCacheNotUpdatedWithoutETag() {
+ RESTCatalogAdapter adapter =
+ Mockito.spy(
+ new RESTCatalogAdapter(backendCatalog) {
+ @Override
+ public <T extends RESTResponse> T execute(
+ HTTPRequest request,
+ Class<T> responseType,
+ Consumer<ErrorResponse> errorHandler,
+ Consumer<Map<String, String>> responseHeaders) {
+ // Wrap the original responseHeaders to not accept ETag.
+ Consumer<Map<String, String>> noETagConsumer =
+ headers -> {
+ if (!headers.containsKey(HttpHeaders.ETAG)) {
+ responseHeaders.accept(headers);
+ }
+ };
+ return super.execute(request, responseType, errorHandler,
noETagConsumer);
+ }
+ });
+
+ RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config ->
adapter);
+ catalog.initialize(
+ "catalog",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+ catalog.createNamespace(TABLE.namespace());
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ catalog.loadTable(TABLE);
+
+
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+ }
+
+ @Test
+ public void testTableCacheIsDisabled() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config ->
adapter);
+ catalog.initialize(
+ "catalog",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.inmemory.InMemoryFileIO",
+ RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES,
+ "0"));
+
+ catalog.createNamespace(TABLE.namespace());
+
+ catalog.createTable(TABLE, SCHEMA);
+
+
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+
+ expectFullTableLoadForLoadTable(TABLE, adapter);
+
+ catalog.loadTable(TABLE);
+
+ catalog.sessionCatalog().tableCache().cache().cleanUp();
+
+
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+ }
+
+ @Test
+ public void testFullTableLoadAfterExpiryFromCache() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ FakeTicker ticker = new FakeTicker();
+
+ TestableRESTCatalog catalog =
+ new TestableRESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter,
ticker);
+ catalog.initialize("catalog", Map.of());
+
+ catalog.createNamespace(TABLE.namespace());
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ catalog.loadTable(TABLE);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
+ catalog.sessionCatalog().tableCache().cache();
+ SessionIdTableId tableCacheKey =
+ SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE);
+
+ assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+ .isPresent()
+ .get()
+ .isEqualTo(Duration.ZERO);
+
+ ticker.advance(HALF_OF_TABLE_EXPIRATION);
+
+ assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+
+ ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10)));
+
+ assertThat(tableCache.asMap()).doesNotContainKey(tableCacheKey);
+
+ expectFullTableLoadForLoadTable(TABLE, adapter);
+
+ catalog.loadTable(TABLE);
+
+ assertThat(tableCache.stats().hitCount()).isEqualTo(0);
+ assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+ .isPresent()
+ .get()
+ .isEqualTo(Duration.ZERO);
+ }
+
+ @Test
+ public void testTableCacheAgeDoesNotRefreshesAfterAccess() {
+ FakeTicker ticker = new FakeTicker();
+
+ TestableRESTCatalog catalog =
+ new TestableRESTCatalog(
+ DEFAULT_SESSION_CONTEXT, config -> new
RESTCatalogAdapter(backendCatalog), ticker);
+ catalog.initialize("catalog", Map.of());
+
+ catalog.createNamespace(TABLE.namespace());
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ catalog.loadTable(TABLE);
+
+ ticker.advance(HALF_OF_TABLE_EXPIRATION);
+
+ Cache<SessionIdTableId, TableWithETag> tableCache =
+ catalog.sessionCatalog().tableCache().cache();
+ SessionIdTableId tableCacheKey =
+ SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE);
+
+
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+
+ catalog.loadTable(TABLE);
+
+
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+ }
+
private RESTCatalog catalog(RESTCatalogAdapter adapter) {
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config)
-> adapter);
@@ -3807,6 +4355,45 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
return catalog;
}
+ private void expectFullTableLoadForLoadTable(TableIdentifier ident,
RESTCatalogAdapter adapter) {
+ Answer<LoadTableResponse> invocationAssertsFullLoad =
+ invocation -> {
+ LoadTableResponse response = (LoadTableResponse)
invocation.callRealMethod();
+
+ assertThat(response).isNotEqualTo(null);
+
+ return response;
+ };
+
+ Mockito.doAnswer(invocationAssertsFullLoad)
+ .when(adapter)
+ .execute(
+ reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+ }
+
+ private void expectNotModifiedResponseForLoadTable(
+ TableIdentifier ident, RESTCatalogAdapter adapter) {
+ Answer<LoadTableResponse> invocationAssertsFullLoad =
+ invocation -> {
+ LoadTableResponse response = (LoadTableResponse)
invocation.callRealMethod();
+
+ assertThat(response).isEqualTo(null);
+
+ return response;
+ };
+
+ Mockito.doAnswer(invocationAssertsFullLoad)
+ .when(adapter)
+ .execute(
+ reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+ }
+
static HTTPRequest reqMatcher(HTTPMethod method) {
return argThat(req -> req.method() == method);
}
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index f84197b0f1..0b1453682b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -859,6 +859,26 @@ public class TestRESTScanPlanning {
}
}
+ @Test
+ void remoteScanPlanningWithFreshnessAwareLoading() throws IOException {
+ RESTCatalog catalog = scanPlanningCatalog();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(NS,
"freshness_aware_loading_test");
+ restTableFor(catalog, tableIdentifier.name());
+
+
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+
+ // Table is cached with the first loadTable
+ catalog.loadTable(tableIdentifier);
+
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isOne();
+
+ // Second loadTable is answered from cache
+ Table table = catalog.loadTable(tableIdentifier);
+
+ // Verify table is RESTTable and newScan() returns RESTTableScan
+ restTableScanFor(table);
+ }
+
// ==================== Endpoint Support Tests ====================
/** Helper class to hold catalog and adapter for endpoint support tests. */
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java
new file mode 100644
index 0000000000..cf66c04ade
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.catalog.SessionCatalog;
+
+class TestableRESTCatalog extends RESTCatalog {
+ private final Ticker ticker;
+
+ TestableRESTCatalog(
+ SessionCatalog.SessionContext context,
+ Function<Map<String, String>, RESTClient> clientBuilder,
+ Ticker ticker) {
+ super(context, clientBuilder);
+
+ this.ticker = ticker;
+ }
+
+ @Override
+ protected RESTSessionCatalog newSessionCatalog(
+ Function<Map<String, String>, RESTClient> clientBuilder) {
+ // This is called from RESTCatalog's constructor, 'ticker' member is not
yet set, we have to
+ // defer passing it to the session catalog.
+ return new TestableRESTSessionCatalog(clientBuilder, null);
+ }
+
+ @Override
+ public void initialize(String name, Map<String, String> props) {
+ ((TestableRESTSessionCatalog) sessionCatalog()).setTicker(ticker);
+
+ super.initialize(name, props);
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java
new file mode 100644
index 0000000000..492fd998e8
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.iceberg.io.FileIO;
+
+class TestableRESTSessionCatalog extends RESTSessionCatalog {
+ private Ticker ticker;
+
+ TestableRESTSessionCatalog(
+ Function<Map<String, String>, RESTClient> clientBuilder,
+ BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder) {
+ super(clientBuilder, ioBuilder);
+ }
+
+ public void setTicker(Ticker newTicker) {
+ this.ticker = newTicker;
+ }
+
+ @Override
+ protected RESTTableCache createTableCache(Map<String, String> props) {
+ return new RESTTableCache(props, ticker);
+ }
+}