This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 72d5fd66c5 Core: Freshness-aware table loading in REST catalog (#14398)
72d5fd66c5 is described below

commit 72d5fd66c527c27c79880e051b7eaf46deb2fed6
Author: gaborkaszab <[email protected]>
AuthorDate: Sun Jan 18 08:46:08 2026 +0100

    Core: Freshness-aware table loading in REST catalog (#14398)
    
    This is the client-side improvement for the freshness-aware table
    loading in REST catalog. The main design is the following:
     - REST server can send an ETag with the LoadTableResponse
     - The client can use this ETag to populate the IF_NONE_MATCH header
       with the next loadTable request
     - The server can send a 304-NOT_MODIFIED response without a body if
       the table has not been changed based on the ETag
     - The client when receives a 304, then returns the latest table
       object associated with the ETag from cache
---
 .../java/org/apache/iceberg/rest/HTTPClient.java   |  10 +
 .../java/org/apache/iceberg/rest/RESTCatalog.java  |   4 +
 .../apache/iceberg/rest/RESTCatalogProperties.java |  11 +
 .../apache/iceberg/rest/RESTSessionCatalog.java    | 168 ++++--
 .../org/apache/iceberg/rest/RESTTableCache.java    | 129 ++++
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 657 +++++++++++++++++++--
 .../apache/iceberg/rest/TestRESTScanPlanning.java  |  20 +
 .../apache/iceberg/rest/TestableRESTCatalog.java   |  52 ++
 .../iceberg/rest/TestableRESTSessionCatalog.java   |  44 ++
 9 files changed, 1026 insertions(+), 69 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java 
b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index b30caf1c7d..2fd3ec882a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -326,6 +326,16 @@ public class HTTPClient extends BaseHTTPClient {
 
       // Skip parsing the response stream for any successful request not 
expecting a response body
       if (emptyBody(response, responseType)) {
+        if (response.getCode() == HttpStatus.SC_NOT_MODIFIED
+            && !req.headers().contains(HttpHeaders.IF_NONE_MATCH)) {
+          // 304-NOT_MODIFIED is used for freshness-aware loading and requires 
an ETag sent to the
+          // server via IF_NONE_MATCH header in the request. If no ETag was 
sent, we shouldn't
+          // receive a 304.
+          throw new RESTException(
+              "Invalid (NOT_MODIFIED) response for request: method=%s, 
path=%s",
+              req.method(), req.path());
+        }
+
         return null;
       }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index aff8832c6b..f4c75d1050 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -96,6 +96,10 @@ public class RESTCatalog
     sessionCatalog.initialize(name, props);
   }
 
+  protected RESTSessionCatalog sessionCatalog() {
+    return sessionCatalog;
+  }
+
   @Override
   public String name() {
     return sessionCatalog.name();
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
index 72b09fa772..e294bcfebe 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.rest;
 
+import java.util.concurrent.TimeUnit;
+
 public final class RESTCatalogProperties {
 
   private RESTCatalogProperties() {}
@@ -43,6 +45,15 @@ public final class RESTCatalogProperties {
 
   public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id";
 
+  // Properties that control the behaviour of the table cache used for 
freshness-aware table
+  // loading.
+  public static final String TABLE_CACHE_EXPIRE_AFTER_WRITE_MS =
+      "rest-table-cache.expire-after-write-ms";
+  public static final long TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(5);
+
+  public static final String TABLE_CACHE_MAX_ENTRIES = 
"rest-table-cache.max-entries";
+  public static final int TABLE_CACHE_MAX_ENTRIES_DEFAULT = 100;
+
   public enum SnapshotMode {
     ALL,
     REFS
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 4ff13ac824..0c4f8a39bf 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -26,9 +26,11 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.hc.core5.http.HttpHeaders;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -60,6 +62,7 @@ import org.apache.iceberg.io.FileIOTracker;
 import org.apache.iceberg.io.StorageCredential;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.metrics.MetricsReporters;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -67,6 +70,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.RESTTableCache.TableWithETag;
 import org.apache.iceberg.rest.auth.AuthManager;
 import org.apache.iceberg.rest.auth.AuthManagers;
 import org.apache.iceberg.rest.auth.AuthSession;
@@ -165,6 +169,8 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
   private Supplier<Map<String, String>> mutationHeaders = Map::of;
   private String namespaceSeparator = null;
 
+  private RESTTableCache tableCache;
+
   public RESTSessionCatalog() {
     this(
         config ->
@@ -277,9 +283,22 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
             mergedProps,
             RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
             RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT);
+
+    this.tableCache = createTableCache(mergedProps);
+    this.closeables.addCloseable(this.tableCache);
+
     super.initialize(name, mergedProps);
   }
 
+  protected RESTTableCache createTableCache(Map<String, String> props) {
+    return new RESTTableCache(props);
+  }
+
+  @VisibleForTesting
+  RESTTableCache tableCache() {
+    return tableCache;
+  }
+
   @Override
   public void setConf(Object newConf) {
     this.conf = newConf;
@@ -332,6 +351,8 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
       return true;
     } catch (NoSuchTableException e) {
       return false;
+    } finally {
+      invalidateTable(context, identifier);
     }
   }
 
@@ -353,6 +374,8 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
       return true;
     } catch (NoSuchTableException e) {
       return false;
+    } finally {
+      invalidateTable(context, identifier);
     }
   }
 
@@ -370,6 +393,8 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
     client
         .withAuthSession(contextualSession)
         .post(paths.rename(), request, null, mutationHeaders, 
ErrorHandlers.tableErrorHandler());
+
+    invalidateTable(context, from);
   }
 
   @Override
@@ -384,9 +409,15 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
         return true;
       } else {
         // fallback in order to work with 1.7.x and older servers
-        return super.tableExists(context, identifier);
+        if (!super.tableExists(context, identifier)) {
+          invalidateTable(context, identifier);
+          return false;
+        }
+
+        return true;
       }
     } catch (NoSuchTableException e) {
+      invalidateTable(context, identifier);
       return false;
     }
   }
@@ -396,7 +427,11 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
   }
 
   private LoadTableResponse loadInternal(
-      SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
+      SessionContext context,
+      TableIdentifier identifier,
+      SnapshotMode mode,
+      Map<String, String> headers,
+      Consumer<Map<String, String>> responseHeaders) {
     Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE);
     AuthSession contextualSession = authManager.contextualSession(context, 
catalogAuth);
     return client
@@ -405,8 +440,9 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
             paths.table(identifier),
             snapshotModeToParam(mode),
             LoadTableResponse.class,
-            Map.of(),
-            ErrorHandlers.tableErrorHandler());
+            headers,
+            ErrorHandlers.tableErrorHandler(),
+            responseHeaders);
   }
 
   @Override
@@ -424,8 +460,25 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
     MetadataTableType metadataType;
     LoadTableResponse response;
     TableIdentifier loadedIdent;
+
+    Map<String, String> responseHeaders = Maps.newHashMap();
+    TableWithETag cachedTable = tableCache.getIfPresent(context.sessionId(), 
identifier);
+
     try {
-      response = loadInternal(context, identifier, snapshotMode);
+      response =
+          loadInternal(
+              context,
+              identifier,
+              snapshotMode,
+              headersForLoadTable(cachedTable),
+              responseHeaders::putAll);
+
+      if (response == null) {
+        Preconditions.checkNotNull(cachedTable, "Invalid load table response: 
null");
+
+        return cachedTable.supplier().get();
+      }
+
       loadedIdent = identifier;
       metadataType = null;
 
@@ -435,14 +488,33 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
         // attempt to load a metadata table using the identifier's namespace 
as the base table
         TableIdentifier baseIdent = 
TableIdentifier.of(identifier.namespace().levels());
         try {
-          response = loadInternal(context, baseIdent, snapshotMode);
+          responseHeaders.clear();
+          cachedTable = tableCache.getIfPresent(context.sessionId(), 
baseIdent);
+
+          response =
+              loadInternal(
+                  context,
+                  baseIdent,
+                  snapshotMode,
+                  headersForLoadTable(cachedTable),
+                  responseHeaders::putAll);
+
+          if (response == null) {
+            Preconditions.checkNotNull(cachedTable, "Invalid load table 
response: null");
+
+            return MetadataTableUtils.createMetadataTableInstance(
+                cachedTable.supplier().get(), metadataType);
+          }
+
           loadedIdent = baseIdent;
         } catch (NoSuchTableException ignored) {
           // the base table does not exist
+          invalidateTable(context, baseIdent);
           throw original;
         }
       } else {
         // name is not a metadata table
+        invalidateTable(context, identifier);
         throw original;
       }
     }
@@ -461,7 +533,7 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
               .setPreviousFileLocation(null)
               .setSnapshotsSupplier(
                   () ->
-                      loadInternal(context, finalIdentifier, SnapshotMode.ALL)
+                      loadInternal(context, finalIdentifier, SnapshotMode.ALL, 
Map.of(), h -> {})
                           .tableMetadata()
                           .snapshots())
               .discardChanges()
@@ -470,38 +542,52 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
       tableMetadata = response.tableMetadata();
     }
 
+    List<Credential> credentials = response.credentials();
     RESTClient tableClient = client.withAuthSession(tableSession);
-    RESTTableOperations ops =
-        newTableOps(
-            tableClient,
-            paths.table(finalIdentifier),
-            Map::of,
-            mutationHeaders,
-            tableFileIO(context, tableConf, response.credentials()),
-            tableMetadata,
-            endpoints);
+    Supplier<BaseTable> tableSupplier =
+        createTableSupplier(
+            finalIdentifier, tableMetadata, context, tableClient, tableConf, 
credentials);
 
-    trackFileIO(ops);
-
-    // RestTable should only be returned for non-metadata tables, because 
client would
-    // not have access to metadata files for example manifests, since all it 
needs is catalog.
-    if (metadataType == null) {
-      RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, 
tableClient);
-      if (restTable != null) {
-        return restTable;
-      }
+    String eTag = responseHeaders.getOrDefault(HttpHeaders.ETAG, null);
+    if (eTag != null) {
+      tableCache.put(context.sessionId(), finalIdentifier, tableSupplier, 
eTag);
     }
 
-    BaseTable table =
-        new BaseTable(
-            ops,
-            fullTableName(finalIdentifier),
-            metricsReporter(paths.metrics(finalIdentifier), tableClient));
     if (metadataType != null) {
-      return MetadataTableUtils.createMetadataTableInstance(table, 
metadataType);
+      return 
MetadataTableUtils.createMetadataTableInstance(tableSupplier.get(), 
metadataType);
     }
 
-    return table;
+    return tableSupplier.get();
+  }
+
+  private Supplier<BaseTable> createTableSupplier(
+      TableIdentifier identifier,
+      TableMetadata tableMetadata,
+      SessionContext context,
+      RESTClient tableClient,
+      Map<String, String> tableConf,
+      List<Credential> credentials) {
+    return () -> {
+      RESTTableOperations ops =
+          newTableOps(
+              tableClient,
+              paths.table(identifier),
+              Map::of,
+              mutationHeaders,
+              tableFileIO(context, tableConf, credentials),
+              tableMetadata,
+              endpoints);
+
+      trackFileIO(ops);
+
+      RESTTable table = restTableForScanPlanning(ops, identifier, tableClient);
+      if (table != null) {
+        return table;
+      }
+
+      return new BaseTable(
+          ops, fullTableName(identifier), 
metricsReporter(paths.metrics(identifier), tableClient));
+    };
   }
 
   private RESTTable restTableForScanPlanning(
@@ -546,7 +632,9 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
   }
 
   @Override
-  public void invalidateTable(SessionContext context, TableIdentifier ident) {}
+  public void invalidateTable(SessionContext context, TableIdentifier ident) {
+    tableCache.invalidate(context.sessionId(), ident);
+  }
 
   @Override
   public Table registerTable(
@@ -906,7 +994,8 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
         throw new AlreadyExistsException("View with same name already exists: 
%s", ident);
       }
 
-      LoadTableResponse response = loadInternal(context, ident, snapshotMode);
+      LoadTableResponse response = loadInternal(context, ident, snapshotMode, 
Map.of(), h -> {});
+
       String fullName = fullTableName(ident);
 
       Map<String, String> tableConf = response.config();
@@ -1368,6 +1457,17 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
         .post(paths.renameView(), request, null, mutationHeaders, 
ErrorHandlers.viewErrorHandler());
   }
 
+  private static Map<String, String> headersForLoadTable(TableWithETag 
tableWithETag) {
+    if (tableWithETag == null) {
+      return Map.of();
+    }
+
+    String eTag = tableWithETag.eTag();
+    Preconditions.checkArgument(eTag != null, "Invalid ETag: null");
+
+    return Map.of(HttpHeaders.IF_NONE_MATCH, eTag);
+  }
+
   private class RESTViewBuilder implements ViewBuilder {
     private final SessionContext context;
     private final TableIdentifier identifier;
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java
new file mode 100644
index 0000000000..912173e9eb
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.catalog.TableIdentifier;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.immutables.value.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class RESTTableCache implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RESTTableCache.class);
+
+  private final Cache<SessionIdTableId, TableWithETag> tableCache;
+
+  RESTTableCache(Map<String, String> props) {
+    this(props, Ticker.systemTicker());
+  }
+
+  @VisibleForTesting
+  RESTTableCache(Map<String, String> props, Ticker ticker) {
+    long expireAfterWriteMS =
+        PropertyUtil.propertyAsLong(
+            props,
+            RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS,
+            RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT);
+    Preconditions.checkArgument(
+        expireAfterWriteMS > 0, "Invalid expire after write: zero or 
negative");
+
+    long numEntries =
+        PropertyUtil.propertyAsLong(
+            props,
+            RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES,
+            RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES_DEFAULT);
+    Preconditions.checkArgument(numEntries >= 0, "Invalid max entries: 
negative");
+
+    tableCache =
+        Caffeine.newBuilder()
+            .maximumSize(numEntries)
+            .expireAfterWrite(Duration.ofMillis(expireAfterWriteMS))
+            .removalListener(
+                (compositeKey, table, cause) ->
+                    LOG.debug("Evicted {} from table cache ({})", 
compositeKey, cause))
+            .recordStats()
+            .ticker(ticker)
+            .build();
+  }
+
+  public TableWithETag getIfPresent(String sessionId, TableIdentifier 
identifier) {
+    SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier);
+    return tableCache.getIfPresent(cacheKey);
+  }
+
+  public void put(
+      String sessionId,
+      TableIdentifier identifier,
+      Supplier<BaseTable> tableSupplier,
+      String eTag) {
+    tableCache.put(
+        SessionIdTableId.of(sessionId, identifier), 
TableWithETag.of(tableSupplier, eTag));
+  }
+
+  public void invalidate(String sessionId, TableIdentifier identifier) {
+    SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier);
+    tableCache.invalidate(cacheKey);
+  }
+
+  @VisibleForTesting
+  Cache<SessionIdTableId, TableWithETag> cache() {
+    return tableCache;
+  }
+
+  @Override
+  public void close() {
+    tableCache.invalidateAll();
+    tableCache.cleanUp();
+  }
+
+  @Value.Immutable
+  interface SessionIdTableId {
+    String sessionId();
+
+    TableIdentifier tableIdentifier();
+
+    static SessionIdTableId of(String sessionId, TableIdentifier ident) {
+      return ImmutableSessionIdTableId.builder()
+          .sessionId(sessionId)
+          .tableIdentifier(ident)
+          .build();
+    }
+  }
+
+  @Value.Immutable
+  interface TableWithETag {
+    Supplier<BaseTable> supplier();
+
+    String eTag();
+
+    static TableWithETag of(Supplier<BaseTable> tableSupplier, String eTag) {
+      return 
ImmutableTableWithETag.builder().supplier(tableSupplier).eTag(eTag).build();
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 40dc050311..6b981c493d 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -33,11 +33,13 @@ import static org.mockito.Mockito.verify;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +57,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.http.HttpHeaders;
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.BaseTransaction;
 import org.apache.iceberg.CatalogProperties;
@@ -79,6 +82,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.RESTException;
@@ -93,6 +97,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
 import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.RESTTableCache.SessionIdTableId;
+import org.apache.iceberg.rest.RESTTableCache.TableWithETag;
 import org.apache.iceberg.rest.auth.AuthManager;
 import org.apache.iceberg.rest.auth.AuthManagers;
 import org.apache.iceberg.rest.auth.AuthSession;
@@ -109,6 +115,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.FakeTicker;
 import org.apache.iceberg.util.Pair;
 import org.assertj.core.api.InstanceOfAssertFactories;
 import org.awaitility.Awaitility;
@@ -133,6 +140,16 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
           ImmutableMap.of(
               RESTCatalogProperties.NAMESPACE_SEPARATOR,
               RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
+  private static final Duration TABLE_EXPIRATION =
+      
Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT);
+  private static final Duration HALF_OF_TABLE_EXPIRATION = 
TABLE_EXPIRATION.dividedBy(2);
+
+  private static final SessionCatalog.SessionContext DEFAULT_SESSION_CONTEXT =
+      new SessionCatalog.SessionContext(
+          UUID.randomUUID().toString(),
+          "user",
+          ImmutableMap.of("credential", "user:12345"),
+          ImmutableMap.of());
 
   private static final class IdempotentEnv {
     private final TableIdentifier ident;
@@ -306,16 +323,10 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
   @Override
   protected RESTCatalog initCatalog(String catalogName, Map<String, String> 
additionalProperties) {
     Configuration conf = new Configuration();
-    SessionCatalog.SessionContext context =
-        new SessionCatalog.SessionContext(
-            UUID.randomUUID().toString(),
-            "user",
-            ImmutableMap.of("credential", "user:12345"),
-            ImmutableMap.of());
 
     RESTCatalog catalog =
         new RESTCatalog(
-            context,
+            DEFAULT_SESSION_CONTEXT,
             (config) ->
                 HTTPClient.builder(config)
                     .uri(config.get(CatalogProperties.URI))
@@ -3009,12 +3020,13 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag);
   }
 
-  @SuppressWarnings("checkstyle:AssertThatThrownByWithMessageCheck")
   @Test
   public void testNotModified() {
     catalog().createNamespace(TABLE.namespace());
 
-    Table table = catalog().createTable(TABLE, SCHEMA);
+    catalog().createTable(TABLE, SCHEMA);
+
+    Table table = catalog().loadTable(TABLE);
 
     String eTag =
         ETagProvider.of(((BaseTable) 
table).operations().current().metadataFileLocation());
@@ -3023,26 +3035,21 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
             invocation -> {
               HTTPRequest originalRequest = invocation.getArgument(0);
 
-              HTTPHeaders extendedHeaders =
-                  ImmutableHTTPHeaders.copyOf(originalRequest.headers())
-                      .putIfAbsent(
-                          ImmutableHTTPHeader.builder()
-                              .name(HttpHeaders.IF_NONE_MATCH)
-                              .value(eTag)
-                              .build());
-
-              ImmutableHTTPRequest extendedRequest =
-                  ImmutableHTTPRequest.builder()
-                      .from(originalRequest)
-                      .headers(extendedHeaders)
-                      .build();
-
-              return adapterForRESTServer.execute(
-                  extendedRequest,
-                  LoadTableResponse.class,
-                  invocation.getArgument(2),
-                  invocation.getArgument(3),
-                  ParserContext.builder().build());
+              
assertThat(originalRequest.headers().contains(HttpHeaders.IF_NONE_MATCH));
+              assertThat(
+                      
originalRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH).get().value())
+                  .isEqualTo(eTag);
+
+              assertThat(
+                      adapterForRESTServer.execute(
+                          originalRequest,
+                          LoadTableResponse.class,
+                          invocation.getArgument(2),
+                          invocation.getArgument(3),
+                          ParserContext.builder().build()))
+                  .isNull();
+
+              return null;
             })
         .when(adapterForRESTServer)
         .execute(
@@ -3051,17 +3058,14 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
             any(),
             any());
 
-    // TODO: This won't throw when client side of freshness-aware loading is 
implemented
-    assertThatThrownBy(() -> 
catalog().loadTable(TABLE)).isInstanceOf(NullPointerException.class);
+    catalog().loadTable(TABLE);
 
     TableIdentifier metadataTableIdentifier =
         TableIdentifier.of(NS.toString(), TABLE.name(), "partitions");
 
-    // TODO: This won't throw when client side of freshness-aware loading is 
implemented
-    assertThatThrownBy(() -> catalog().loadTable(metadataTableIdentifier))
-        .isInstanceOf(NullPointerException.class);
+    catalog().loadTable(metadataTableIdentifier);
 
-    Mockito.verify(adapterForRESTServer, times(2))
+    Mockito.verify(adapterForRESTServer, times(3))
         .execute(
             reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
             eq(LoadTableResponse.class),
@@ -3378,6 +3382,60 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     }
   }
 
+  @Test
+  public void testCustomTableOperationsWithFreshnessAwareLoading() {
+    class CustomTableOps extends RESTTableOperations {
+      CustomTableOps(
+          RESTClient client,
+          String path,
+          Supplier<Map<String, String>> readHeaders,
+          Supplier<Map<String, String>> mutationHeaders,
+          FileIO io,
+          TableMetadata current,
+          Set<Endpoint> endpoints) {
+        super(client, path, readHeaders, mutationHeaders, io, current, 
endpoints);
+      }
+    }
+
+    class CustomRESTSessionCatalog extends RESTSessionCatalog {
+      CustomRESTSessionCatalog(
+          Function<Map<String, String>, RESTClient> clientBuilder,
+          BiFunction<SessionCatalog.SessionContext, Map<String, String>, 
FileIO> ioBuilder) {
+        super(clientBuilder, ioBuilder);
+      }
+
+      @Override
+      protected RESTTableOperations newTableOps(
+          RESTClient restClient,
+          String path,
+          Supplier<Map<String, String>> readHeaders,
+          Supplier<Map<String, String>> mutationHeaders,
+          FileIO fileIO,
+          TableMetadata current,
+          Set<Endpoint> supportedEndpoints) {
+        return new CustomTableOps(
+            restClient, path, readHeaders, mutationHeaders, fileIO, current, 
supportedEndpoints);
+      }
+    }
+
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    RESTCatalog catalog =
+        catalog(adapter, clientBuilder -> new 
CustomRESTSessionCatalog(clientBuilder, null));
+
+    catalog.createNamespace(NS);
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+    BaseTable table = (BaseTable) catalog.loadTable(TABLE);
+    assertThat(table.operations()).isInstanceOf(CustomTableOps.class);
+
+    // When answering loadTable from table cache we still get the injected ops.
+    expectNotModifiedResponseForLoadTable(TABLE, adapter);
+    table = (BaseTable) catalog.loadTable(TABLE);
+    assertThat(table.operations()).isInstanceOf(CustomTableOps.class);
+  }
+
   @Test
   public void testClientAutoSendsIdempotencyWhenServerAdvertises() {
     ConfigResponse cfgWithIdem =
@@ -3778,6 +3836,496 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
         .hasMessageContaining("No in-memory file found for location: " + 
metadataFileLocation);
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache =
+        restCatalog.sessionCatalog().tableCache().cache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE))
+                .supplier()
+                .get()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache =
+        restCatalog.sessionCatalog().tableCache().cache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    ResourcePaths paths =
+        ResourcePaths.forCatalogProperties(
+            ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E"));
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, paths.table(TABLE)), any(), any(), 
any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // The main catalog still has the table in cache
+          assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+              
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+          catalog.tableExists(TABLE);
+        }),
+        0);
+  }
+
+  @Test
+  public void testTableExistViaGetRequestInvalidatesTable() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    // Configure REST server to answer tableExists query via GET
+    Mockito.doAnswer(
+            invocation ->
+                ConfigResponse.builder()
+                    .withEndpoints(
+                        ImmutableList.of(
+                            Endpoint.V1_LOAD_TABLE,
+                            Endpoint.V1_CREATE_NAMESPACE,
+                            Endpoint.V1_CREATE_TABLE))
+                    .build())
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+            eq(ConfigResponse.class),
+            any(),
+            any());
+
+    RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    runTableInvalidationTest(
+        catalog,
+        adapter,
+        (cat) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // The main catalog still has the table in cache
+          assertThat(cat.sessionCatalog().tableCache().cache().asMap())
+              
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+          cat.tableExists(TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableInvalidatesCache() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // The main catalog still has the table in cache
+          assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+              
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+          assertThatThrownBy(() -> catalog.loadTable(TABLE))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableWithMetadataTableNameInvalidatesCache() {
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // The main catalog still has the table in cache
+          assertThat(catalog.sessionCatalog().tableCache().cache().asMap())
+              
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+          assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+
+    ResourcePaths paths =
+        ResourcePaths.forCatalogProperties(
+            ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E"));
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), 
any(), any(), any());
+  }
+
+  private void runTableInvalidationTest(
+      RESTCatalog catalog,
+      RESTCatalogAdapter adapterToVerify,
+      Consumer<RESTCatalog> action,
+      int loadTableCountFromAction) {
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache =
+        catalog.sessionCatalog().tableCache().cache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    action.accept(catalog);
+
+    // Check that 'action' invalidates cache
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    assertThatThrownBy(() -> catalog.loadTable(TABLE))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist: %s", TABLE);
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    expectFullTableLoadForLoadTable(TABLE, adapterToVerify);
+
+    BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE);
+
+    
assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    assertThat(newTableWithSameName).isNotEqualTo(originalTable);
+    
assertThat(newTableWithSameName.operations().current().metadataFileLocation())
+        
.isNotEqualTo(originalTable.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testTableCacheWithMultiSessions() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> 
adapter, null);
+    sessionCatalog.initialize("test_session_catalog", Map.of());
+
+    SessionCatalog.SessionContext otherSessionContext =
+        new SessionCatalog.SessionContext(
+            "session_id_2", "user", ImmutableMap.of("credential", 
"user:12345"), ImmutableMap.of());
+
+    sessionCatalog.createNamespace(DEFAULT_SESSION_CONTEXT, TABLE.namespace());
+
+    sessionCatalog.buildTable(DEFAULT_SESSION_CONTEXT, TABLE, SCHEMA).create();
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache = 
sessionCatalog.tableCache().cache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), 
TABLE));
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(otherSessionContext, TABLE);
+
+    assertThat(tableCache.asMap())
+        .containsOnlyKeys(
+            SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE),
+            SessionIdTableId.of(otherSessionContext.sessionId(), TABLE));
+  }
+
+  @Test
+  public void test304NotModifiedResponseWithEmptyTableCache() {
+    Mockito.doAnswer(invocation -> null)
+        .when(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    catalog().invalidateTable(TABLE);
+
+    // Table is not in the cache and null LoadTableResponse is received
+    assertThatThrownBy(() -> catalog().loadTable(TABLE))
+        .isInstanceOf(RESTException.class)
+        .hasMessage(
+            "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s",
+            HTTPMethod.GET, RESOURCE_PATHS.table(TABLE));
+  }
+
+  @Test
+  public void testTableCacheNotUpdatedWithoutETag() {
+    RESTCatalogAdapter adapter =
+        Mockito.spy(
+            new RESTCatalogAdapter(backendCatalog) {
+              @Override
+              public <T extends RESTResponse> T execute(
+                  HTTPRequest request,
+                  Class<T> responseType,
+                  Consumer<ErrorResponse> errorHandler,
+                  Consumer<Map<String, String>> responseHeaders) {
+                // Wrap the original responseHeaders to not accept ETag.
+                Consumer<Map<String, String>> noETagConsumer =
+                    headers -> {
+                      if (!headers.containsKey(HttpHeaders.ETAG)) {
+                        responseHeaders.accept(headers);
+                      }
+                    };
+                return super.execute(request, responseType, errorHandler, 
noETagConsumer);
+              }
+            });
+
+    RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    catalog.loadTable(TABLE);
+
+    
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+  }
+
+  @Test
+  public void testTableCacheIsDisabled() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.inmemory.InMemoryFileIO",
+            RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES,
+            "0"));
+
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    catalog.loadTable(TABLE);
+
+    catalog.sessionCatalog().tableCache().cache().cleanUp();
+
+    
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+  }
+
+  @Test
+  public void testFullTableLoadAfterExpiryFromCache() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    FakeTicker ticker = new FakeTicker();
+
+    TestableRESTCatalog catalog =
+        new TestableRESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter, 
ticker);
+    catalog.initialize("catalog", Map.of());
+
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    catalog.loadTable(TABLE);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache =
+        catalog.sessionCatalog().tableCache().cache();
+    SessionIdTableId tableCacheKey =
+        SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE);
+
+    assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+    
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+        .isPresent()
+        .get()
+        .isEqualTo(Duration.ZERO);
+
+    ticker.advance(HALF_OF_TABLE_EXPIRATION);
+
+    assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+    
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+        .isPresent()
+        .get()
+        .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+
+    ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10)));
+
+    assertThat(tableCache.asMap()).doesNotContainKey(tableCacheKey);
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    catalog.loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(0);
+    assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey);
+    
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+        .isPresent()
+        .get()
+        .isEqualTo(Duration.ZERO);
+  }
+
+  @Test
+  public void testTableCacheAgeDoesNotRefreshesAfterAccess() {
+    FakeTicker ticker = new FakeTicker();
+
+    TestableRESTCatalog catalog =
+        new TestableRESTCatalog(
+            DEFAULT_SESSION_CONTEXT, config -> new 
RESTCatalogAdapter(backendCatalog), ticker);
+    catalog.initialize("catalog", Map.of());
+
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    catalog.loadTable(TABLE);
+
+    ticker.advance(HALF_OF_TABLE_EXPIRATION);
+
+    Cache<SessionIdTableId, TableWithETag> tableCache =
+        catalog.sessionCatalog().tableCache().cache();
+    SessionIdTableId tableCacheKey =
+        SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE);
+
+    
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+        .isPresent()
+        .get()
+        .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+
+    catalog.loadTable(TABLE);
+
+    
assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey))
+        .isPresent()
+        .get()
+        .isEqualTo(HALF_OF_TABLE_EXPIRATION);
+  }
+
   private RESTCatalog catalog(RESTCatalogAdapter adapter) {
     RESTCatalog catalog =
         new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) 
-> adapter);
@@ -3807,6 +4355,45 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     return catalog;
   }
 
+  private void expectFullTableLoadForLoadTable(TableIdentifier ident, 
RESTCatalogAdapter adapter) {
+    Answer<LoadTableResponse> invocationAssertsFullLoad =
+        invocation -> {
+          LoadTableResponse response = (LoadTableResponse) 
invocation.callRealMethod();
+
+          assertThat(response).isNotEqualTo(null);
+
+          return response;
+        };
+
+    Mockito.doAnswer(invocationAssertsFullLoad)
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+  }
+
+  private void expectNotModifiedResponseForLoadTable(
+      TableIdentifier ident, RESTCatalogAdapter adapter) {
+    Answer<LoadTableResponse> invocationAssertsFullLoad =
+        invocation -> {
+          LoadTableResponse response = (LoadTableResponse) 
invocation.callRealMethod();
+
+          assertThat(response).isEqualTo(null);
+
+          return response;
+        };
+
+    Mockito.doAnswer(invocationAssertsFullLoad)
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+  }
+
   static HTTPRequest reqMatcher(HTTPMethod method) {
     return argThat(req -> req.method() == method);
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index f84197b0f1..0b1453682b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -859,6 +859,26 @@ public class TestRESTScanPlanning {
     }
   }
 
+  @Test
+  void remoteScanPlanningWithFreshnessAwareLoading() throws IOException {
+    RESTCatalog catalog = scanPlanningCatalog();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(NS, 
"freshness_aware_loading_test");
+    restTableFor(catalog, tableIdentifier.name());
+
+    
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero();
+
+    // Table is cached with the first loadTable
+    catalog.loadTable(tableIdentifier);
+    
assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isOne();
+
+    // Second loadTable is answered from cache
+    Table table = catalog.loadTable(tableIdentifier);
+
+    // Verify table is RESTTable and newScan() returns RESTTableScan
+    restTableScanFor(table);
+  }
+
   // ==================== Endpoint Support Tests ====================
 
   /** Helper class to hold catalog and adapter for endpoint support tests. */
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java
new file mode 100644
index 0000000000..cf66c04ade
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.catalog.SessionCatalog;
+
+class TestableRESTCatalog extends RESTCatalog {
+  private final Ticker ticker;
+
+  TestableRESTCatalog(
+      SessionCatalog.SessionContext context,
+      Function<Map<String, String>, RESTClient> clientBuilder,
+      Ticker ticker) {
+    super(context, clientBuilder);
+
+    this.ticker = ticker;
+  }
+
+  @Override
+  protected RESTSessionCatalog newSessionCatalog(
+      Function<Map<String, String>, RESTClient> clientBuilder) {
+    // This is called from RESTCatalog's constructor, 'ticker' member is not 
yet set, we have to
+    // defer passing it to the session catalog.
+    return new TestableRESTSessionCatalog(clientBuilder, null);
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> props) {
+    ((TestableRESTSessionCatalog) sessionCatalog()).setTicker(ticker);
+
+    super.initialize(name, props);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java
new file mode 100644
index 0000000000..492fd998e8
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.iceberg.io.FileIO;
+
+class TestableRESTSessionCatalog extends RESTSessionCatalog {
+  private Ticker ticker;
+
+  TestableRESTSessionCatalog(
+      Function<Map<String, String>, RESTClient> clientBuilder,
+      BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder) {
+    super(clientBuilder, ioBuilder);
+  }
+
+  public void setTicker(Ticker newTicker) {
+    this.ticker = newTicker;
+  }
+
+  @Override
+  protected RESTTableCache createTableCache(Map<String, String> props) {
+    return new RESTTableCache(props, ticker);
+  }
+}

Reply via email to