This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc4a198c5 Spark: Add Spark catalog for loading tables from cache 
(#5247)
6cc4a198c5 is described below

commit 6cc4a198c56c05ff103e6ecdf75fe50004af19da
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Jul 11 17:12:59 2022 -0700

    Spark: Add Spark catalog for loading tables from cache (#5247)
---
 .../apache/iceberg/actions/ActionsProvider.java    |  11 --
 .../extensions/TestRewriteDataFilesProcedure.java  |  98 +++++++++++-
 .../action/IcebergSortCompactionBenchmark.java     |  29 ++--
 .../iceberg/spark/SparkCachedTableCatalog.java     | 158 ++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableCache.java  |  55 +++++++
 .../actions/BaseRewriteDataFilesSparkAction.java   |  11 +-
 .../apache/iceberg/spark/actions/SparkActions.java |   6 -
 .../spark/actions/SparkBinPackStrategy.java        |  19 +--
 .../iceberg/spark/actions/SparkSortStrategy.java   |  12 +-
 .../iceberg/spark/actions/SparkZOrderStrategy.java |   6 +-
 .../procedures/RewriteDataFilesProcedure.java      |   4 +-
 .../apache/iceberg/spark/source/IcebergSource.java |  47 ++++--
 .../iceberg/spark/TestSparkCachedTableCatalog.java |  88 ++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  |  12 +-
 .../extensions/TestRewriteDataFilesProcedure.java  |  98 +++++++++++-
 .../action/IcebergSortCompactionBenchmark.java     |  29 ++--
 .../iceberg/spark/SparkCachedTableCatalog.java     | 178 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableCache.java  |  55 +++++++
 .../actions/BaseRewriteDataFilesSparkAction.java   |  11 +-
 .../apache/iceberg/spark/actions/SparkActions.java |   6 -
 .../spark/actions/SparkBinPackStrategy.java        |  19 +--
 .../iceberg/spark/actions/SparkSortStrategy.java   |  12 +-
 .../iceberg/spark/actions/SparkZOrderStrategy.java |   6 +-
 .../procedures/RewriteDataFilesProcedure.java      |   4 +-
 .../apache/iceberg/spark/source/IcebergSource.java |  47 ++++--
 .../iceberg/spark/TestSparkCachedTableCatalog.java |  88 ++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  |  12 +-
 27 files changed, 964 insertions(+), 157 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java 
b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index 2a73809d60..f2564ddb70 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -56,22 +56,11 @@ public interface ActionsProvider {
 
   /**
    * Instantiates an action to rewrite data files.
-   * @deprecated please use {@link #rewriteDataFiles(Table, String)}
    */
-  @Deprecated
   default RewriteDataFiles rewriteDataFiles(Table table) {
     throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement rewriteDataFiles");
   }
 
-  /**
-   * Instantiates an action to rewrite data files.
-   */
-  default RewriteDataFiles rewriteDataFiles(Table table, String 
fullIdentifier) {
-    throw new UnsupportedOperationException(
-        this.getClass().getName() + " does not implement 
rewriteDataFiles(Table, String)"
-    );
-  }
-
   /**
    * Instantiates an action to expire snapshots.
    */
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 63a63ddccb..9fdb0b1ed5 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -36,11 +38,14 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 
 public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
 
+  private static final String QUOTED_SPECIAL_CHARS_TABLE_NAME = 
"`table:with.special:chars`";
+
   public TestRewriteDataFilesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
     super(catalogName, implementation, config);
   }
@@ -48,6 +53,7 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   @After
   public void removeTable() {
     sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
   }
 
   @Test
@@ -376,6 +382,86 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
         () -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
   }
 
+  @Test
+  public void testBinPackTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 is not 
null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
+  @Test
+  public void testSortTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(" +
+            "  table => '%s'," +
+            "  strategy => 'sort'," +
+            "  sort_order => 'c1'," +
+            "  where => 'c2 is not null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
+  @Test
+  public void testZOrderTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(" +
+            "  table => '%s'," +
+            "  strategy => 'sort'," +
+            "  sort_order => 'zorder(c1, c2)'," +
+            "  where => 'c2 is not null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
@@ -385,6 +471,10 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   }
 
   private void insertData(int filesCount) {
+    insertData(tableName, filesCount);
+  }
+
+  private void insertData(String table, int filesCount) {
     ThreeColumnRecord record1 = new ThreeColumnRecord(1, "foo", null);
     ThreeColumnRecord record2 = new ThreeColumnRecord(2, "bar", null);
 
@@ -396,13 +486,17 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).repartition(filesCount);
     try {
-      df.writeTo(tableName).append();
+      df.writeTo(table).append();
     } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
       throw new RuntimeException(e);
     }
   }
 
   private List<Object[]> currentData() {
-    return rowsToJava(spark.sql("SELECT * FROM " + tableName + " order by c1, 
c2, c3").collectAsList());
+    return currentData(tableName);
+  }
+
+  private List<Object[]> currentData(String table) {
+    return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, 
c3").collectAsList());
   }
 }
diff --git 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 17f9e23862..8c205037f5 100644
--- 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,7 +74,6 @@ public class IcebergSortCompactionBenchmark {
   private static final String[] NAMESPACE = new String[] {"default"};
   private static final String NAME = "sortbench";
   private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
-  private static final String FULL_IDENT = 
Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
   private static final int NUM_FILES = 8;
   private static final long NUM_ROWS = 7500000L;
   private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -107,7 +106,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -120,7 +119,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -134,7 +133,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -150,7 +149,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -166,7 +165,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortString() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -179,7 +178,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -195,7 +194,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -213,7 +212,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol")
         .execute();
@@ -223,7 +222,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2")
         .execute();
@@ -233,7 +232,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3")
         .execute();
@@ -243,7 +242,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3", "intCol4")
         .execute();
@@ -253,7 +252,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortString() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol")
         .execute();
@@ -263,7 +262,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "doubleCol")
         .execute();
@@ -273,7 +272,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", 
"longCol")
         .execute();
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
new file mode 100644
index 0000000000..ff4081764b
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An internal table catalog that is capable of loading tables from a cache.
+ */
+public class SparkCachedTableCatalog implements TableCatalog {
+
+  private static final String CLASS_NAME = 
SparkCachedTableCatalog.class.getName();
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
+  private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+  private String name = null;
+
+  @Override
+  public Identifier[] listTables(String[] namespace) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
listing tables");
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+    Pair<Table, Long> table = load(ident);
+    return new SparkTable(table.first(), table.second(), false /* refresh 
eagerly */);
+  }
+
+  @Override
+  public void invalidateTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
table invalidation");
+  }
+
+  @Override
+  public SparkTable createTable(Identifier ident, StructType schema, 
Transform[] partitions,
+                                Map<String, String> properties) throws 
TableAlreadyExistsException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
creating tables");
+  }
+
+  @Override
+  public SparkTable alterTable(Identifier ident, TableChange... changes) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
altering tables");
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
dropping tables");
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) throws 
UnsupportedOperationException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
purging tables");
+  }
+
+  @Override
+  public void renameTable(Identifier oldIdent, Identifier newIdent) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
renaming tables");
+  }
+
+  @Override
+  public void initialize(String catalogName, CaseInsensitiveStringMap options) 
{
+    this.name = catalogName;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException 
{
+    Preconditions.checkArgument(ident.namespace().length == 0, CLASS_NAME + " 
does not support namespaces");
+
+    Pair<String, List<String>> parsedIdent = parseIdent(ident);
+    String key = parsedIdent.first();
+    List<String> metadata = parsedIdent.second();
+
+    Long asOfTimestamp = null;
+    Long snapshotId = null;
+    for (String meta : metadata) {
+      Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
+      if (timeBasedMatcher.matches()) {
+        asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
+        continue;
+      }
+
+      Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
+      if (snapshotBasedMatcher.matches()) {
+        snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
+      }
+    }
+
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot and timestamp for time travel: %s", 
ident);
+
+    Table table = TABLE_CACHE.get(key);
+
+    if (table == null) {
+      throw new NoSuchTableException(ident);
+    }
+
+    if (snapshotId != null) {
+      return Pair.of(table, snapshotId);
+    } else if (asOfTimestamp != null) {
+      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, 
asOfTimestamp));
+    } else {
+      return Pair.of(table, null);
+    }
+  }
+
+  private Pair<String, List<String>> parseIdent(Identifier ident) {
+    int hashIndex = ident.name().lastIndexOf('#');
+    if (hashIndex != -1 && !ident.name().endsWith("#")) {
+      String key = ident.name().substring(0, hashIndex);
+      List<String> metadata = 
COMMA.splitToList(ident.name().substring(hashIndex + 1));
+      return Pair.of(key, metadata);
+    } else {
+      return Pair.of(ident.name(), ImmutableList.of());
+    }
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
new file mode 100644
index 0000000000..ec587c529f
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SparkTableCache {
+
+  private static final SparkTableCache INSTANCE = new SparkTableCache();
+
+  private final Map<String, Table> cache = Maps.newConcurrentMap();
+
+  public static SparkTableCache get() {
+    return INSTANCE;
+  }
+
+  public int size() {
+    return cache.size();
+  }
+
+  public void add(String key, Table table) {
+    cache.put(key, table);
+  }
+
+  public boolean contains(String key) {
+    return cache.containsKey(key);
+  }
+
+  public Table get(String key) {
+    return cache.get(key);
+  }
+
+  public Table remove(String key) {
+    return cache.remove(key);
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 5ab2539c2c..d1937fcbcd 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,7 +87,6 @@ public class BaseRewriteDataFilesSparkAction
   );
 
   private final Table table;
-  private final String fullIdentifier;
 
   private Expression filter = Expressions.alwaysTrue();
   private int maxConcurrentFileGroupRewrites;
@@ -97,17 +96,9 @@ public class BaseRewriteDataFilesSparkAction
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
-  @Deprecated
   protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
-    this.fullIdentifier = null;
-  }
-
-  protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table, 
String fullIdentifier) {
-    super(spark);
-    this.table = table;
-    this.fullIdentifier = fullIdentifier;
   }
 
   @Override
@@ -439,7 +430,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   private BinPackStrategy binPackStrategy() {
-    return new SparkBinPackStrategy(table, fullIdentifier, spark());
+    return new SparkBinPackStrategy(table, spark());
   }
 
   private SortStrategy sortStrategy() {
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 4f6bb3f00d..d77a13d128 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,16 +72,10 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  @Deprecated
   public RewriteDataFiles rewriteDataFiles(Table table) {
     return new BaseRewriteDataFilesSparkAction(spark, table);
   }
 
-  @Override
-  public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) 
{
-    return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
-  }
-
   @Override
   public DeleteOrphanFiles deleteOrphanFiles(Table table) {
     return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index fcfaeaa751..d8c1cc3610 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.actions.BinPackStrategy;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -38,24 +39,14 @@ import org.apache.spark.sql.internal.SQLConf;
 
 public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
-  private final String fullIdentifier;
   private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
   private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
-  public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession 
spark) {
-    this.table = table;
-    this.spark = spark;
-    // Fallback if a quoted identifier is not supplied
-    this.fullIdentifier = fullIdentifier == null ? table.name() : 
fullIdentifier;
-  }
-
-  @Deprecated
   public SparkBinPackStrategy(Table table, SparkSession spark) {
     this.table = table;
     this.spark = spark;
-    // Fallback if a quoted identifier is not supplied
-    this.fullIdentifier = table.name();
   }
 
   @Override
@@ -67,6 +58,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
   public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
     String groupID = UUID.randomUUID().toString();
     try {
+      tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -77,7 +69,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
           .option(SparkReadOptions.SPLIT_SIZE, 
splitSize(inputFileSize(filesToRewrite)))
           .option(SparkReadOptions.FILE_OPEN_COST, "0")
-          .load(fullIdentifier);
+          .load(groupID);
 
       // All files within a file group are written with the same spec, so 
check the first
       boolean requiresRepartition = 
!filesToRewrite.get(0).spec().equals(table.spec());
@@ -93,10 +85,11 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
           .mode("append")
-          .save(fullIdentifier);
+          .save(groupID);
 
       return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
+      tableCache.remove(groupID);
       manager.removeTasks(table, groupID);
       rewriteCoordinator.clearRewrite(table, groupID);
     }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 6c8f8c027d..97f46d7938 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SortOrderUtil;
@@ -63,6 +64,7 @@ public class SparkSortStrategy extends SortStrategy {
 
   private final Table table;
   private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
   private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
@@ -114,6 +116,7 @@ public class SparkSortStrategy extends SortStrategy {
     Distribution distribution = Distributions.ordered(ordering);
 
     try {
+      tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -126,7 +129,7 @@ public class SparkSortStrategy extends SortStrategy {
 
       Dataset<Row> scanDF = cloneSession.read().format("iceberg")
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
-          .load(table.name());
+          .load(groupID);
 
       // write the packed data into new files where each split becomes a new 
file
       SQLConf sqlConf = cloneSession.sessionState().conf();
@@ -139,10 +142,11 @@ public class SparkSortStrategy extends SortStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
           .mode("append") // This will only write files without modifying the 
table, see SparkWrite.RewriteFiles
-          .save(table.name());
+          .save(groupID);
 
       return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
+      tableCache.remove(groupID);
       manager.removeTasks(table, groupID);
       rewriteCoordinator.clearRewrite(table, groupID);
     }
@@ -160,6 +164,10 @@ public class SparkSortStrategy extends SortStrategy {
     return sizeEstimateMultiple;
   }
 
+  protected SparkTableCache tableCache() {
+    return tableCache;
+  }
+
   protected FileScanTaskSetManager manager() {
     return manager;
   }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index fef23acd9b..8044039a59 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -178,6 +178,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
     Distribution distribution = Distributions.ordered(ordering);
 
     try {
+      tableCache().add(groupID, table());
       manager().stageTasks(table(), groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -190,7 +191,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
 
       Dataset<Row> scanDF = cloneSession.read().format("iceberg")
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
-          .load(table().name());
+          .load(groupID);
 
       Column[] originalColumns = Arrays.stream(scanDF.schema().names())
           .map(n -> functions.col(n))
@@ -217,10 +218,11 @@ public class SparkZOrderStrategy extends 
SparkSortStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
           .mode("append")
-          .save(table().name());
+          .save(groupID);
 
       return rewriteCoordinator().fetchNewDataFiles(table(), groupID);
     } finally {
+      tableCache().remove(groupID);
       manager().removeTasks(table(), groupID);
       rewriteCoordinator().clearRewrite(table(), groupID);
     }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 27ffc69086..5c71dbf555 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -48,7 +48,7 @@ import scala.runtime.BoxedUnit;
 /**
  * A procedure that rewrites datafiles in a table.
  *
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table, 
String)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
  */
 class RewriteDataFilesProcedure extends BaseProcedure {
 
@@ -95,7 +95,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
 
     return modifyIcebergTable(tableIdent, table -> {
       String quotedFullIdentifier = 
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
-      RewriteDataFiles action = actions().rewriteDataFiles(table, 
quotedFullIdentifier);
+      RewriteDataFiles action = actions().rewriteDataFiles(table);
 
       String strategy = args.isNullAt(1) ? null : args.getString(1);
       String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 7aa66b2223..d70d902c1f 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -25,8 +25,11 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.PathIdentifier;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCachedTableCatalog;
+import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -57,9 +60,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  */
 public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions {
   private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
+  private static final String DEFAULT_CACHE_CATALOG_NAME = 
"default_cache_iceberg";
+  private static final String DEFAULT_CACHE_CATALOG = "spark.sql.catalog." + 
DEFAULT_CACHE_CATALOG_NAME;
   private static final String DEFAULT_CATALOG = "spark.sql.catalog." + 
DEFAULT_CATALOG_NAME;
   private static final String AT_TIMESTAMP = "at_timestamp_";
   private static final String SNAPSHOT_ID = "snapshot_id_";
+  private static final String[] EMPTY_NAMESPACE = new String[0];
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
 
   @Override
   public String shortName() {
@@ -103,7 +111,7 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
   private Spark3Util.CatalogAndIdentifier 
catalogAndIdentifier(CaseInsensitiveStringMap options) {
     Preconditions.checkArgument(options.containsKey("path"), "Cannot open 
table: path is not set");
     SparkSession spark = SparkSession.active();
-    setupDefaultSparkCatalog(spark);
+    setupDefaultSparkCatalogs(spark);
     String path = options.get("path");
 
     Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
@@ -123,11 +131,14 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
 
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
-    if (path.contains("/")) {
+    if (TABLE_CACHE.contains(path)) {
+      return new Spark3Util.CatalogAndIdentifier(
+          catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
+          Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
+    } else if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
-      String newPath = (selector == null) ? path : path + "#" + selector;
       return new 
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          new PathIdentifier(newPath));
+          new PathIdentifier(pathWithSelector(path, selector)));
     }
 
     final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = 
Spark3Util.catalogAndIdentifier(
@@ -143,6 +154,10 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
     }
   }
 
+  private String pathWithSelector(String path, String selector) {
+    return (selector == null) ? path : path + "#" + selector;
+  }
+
   private Identifier identifierWithSelector(Identifier ident, String selector) 
{
     if (selector == null) {
       return ident;
@@ -173,17 +188,19 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
     return null;
   }
 
-  private static void setupDefaultSparkCatalog(SparkSession spark) {
-    if (spark.conf().contains(DEFAULT_CATALOG)) {
-      return;
+  private static void setupDefaultSparkCatalogs(SparkSession spark) {
+    if (!spark.conf().contains(DEFAULT_CATALOG)) {
+      ImmutableMap<String, String> config = ImmutableMap.of(
+          "type", "hive",
+          "default-namespace", "default",
+          "cache-enabled", "false" // the source should not use a cache
+      );
+      spark.conf().set(DEFAULT_CATALOG, SparkCatalog.class.getName());
+      config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + 
key, value));
+    }
+
+    if (!spark.conf().contains(DEFAULT_CACHE_CATALOG)) {
+      spark.conf().set(DEFAULT_CACHE_CATALOG, 
SparkCachedTableCatalog.class.getName());
     }
-    ImmutableMap<String, String> config = ImmutableMap.of(
-        "type", "hive",
-        "default-namespace", "default",
-        "cache-enabled", "false" // the source should not use a cache
-    );
-    String catalogName = "org.apache.iceberg.spark.SparkCatalog";
-    spark.conf().set(DEFAULT_CATALOG, catalogName);
-    config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + 
key, value));
   }
 }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
new file mode 100644
index 0000000000..817af03029
--- /dev/null
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkCachedTableCatalog extends SparkTestBaseWithCatalog {
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+  @BeforeClass
+  public static void setupCachedTableCatalog() {
+    spark.conf().set("spark.sql.catalog.testcache", 
SparkCachedTableCatalog.class.getName());
+  }
+
+  @AfterClass
+  public static void unsetCachedTableCatalog() {
+    spark.conf().unset("spark.sql.catalog.testcache");
+  }
+
+  public TestSparkCachedTableCatalog() {
+    super(SparkCatalogConfig.HIVE);
+  }
+
+  @Test
+  public void testTimeTravel() {
+    sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+    table.refresh();
+    Snapshot firstSnapshot = table.currentSnapshot();
+    waitUntilAfter(firstSnapshot.timestampMillis());
+
+    sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
+
+    table.refresh();
+    Snapshot secondSnapshot = table.currentSnapshot();
+    waitUntilAfter(secondSnapshot.timestampMillis());
+
+    sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
+
+    table.refresh();
+
+    try {
+      TABLE_CACHE.add("key", table);
+
+      assertEquals("Should have expected rows in 3rd snapshot",
+          ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
+          sql("SELECT * FROM testcache.key ORDER BY id"));
+
+      assertEquals("Should have expected rows in 2nd snapshot",
+          ImmutableList.of(row(1, "hr"), row(2, "hr")),
+          sql("SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id", 
secondSnapshot.timestampMillis()));
+
+      assertEquals("Should have expected rows in 1st snapshot",
+          ImmutableList.of(row(1, "hr")),
+          sql("SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id", 
firstSnapshot.snapshotId()));
+
+    } finally {
+      TABLE_CACHE.remove("key");
+    }
+  }
+}
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index ea2a7755f2..e0b1e53cfe 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   private RewriteDataFiles basicRewrite(Table table) {
     // Always compact regardless of input files
     table.refresh();
-    return actions().rewriteDataFiles(table, 
tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+    return 
actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
   }
 
   @Test
@@ -259,7 +259,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table, tableLocation)
+    Result result = actions().rewriteDataFiles(table)
         // do not include any file based on bin pack file size configs
         .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
         .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
@@ -293,7 +293,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table, tableLocation)
+    Result result = actions().rewriteDataFiles(table)
         .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
         .execute();
     Assert.assertEquals("Action should rewrite 1 data files", 1, 
result.rewrittenDataFilesCount());
@@ -1123,17 +1123,17 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Table table = createTable(1);
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than 
once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table, 
tableLocation).binPack().sort());
+        "Cannot set strategy", () -> 
actions().rewriteDataFiles(table).binPack().sort());
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than 
once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table, 
tableLocation).sort().binPack());
+        "Cannot set strategy", () -> 
actions().rewriteDataFiles(table).sort().binPack());
 
     AssertHelpers.assertThrows(
         "Should be unable to set Strategy more than once",
         IllegalArgumentException.class,
         "Cannot set strategy",
         () ->
-            actions().rewriteDataFiles(table, 
tableLocation).sort(SortOrder.unsorted()).binPack());
+            
actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
   }
 
   @Test
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 63a63ddccb..9fdb0b1ed5 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -36,11 +38,14 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 
 public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
 
+  private static final String QUOTED_SPECIAL_CHARS_TABLE_NAME = 
"`table:with.special:chars`";
+
   public TestRewriteDataFilesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
     super(catalogName, implementation, config);
   }
@@ -48,6 +53,7 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   @After
   public void removeTable() {
     sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
   }
 
   @Test
@@ -376,6 +382,86 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
         () -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
   }
 
+  @Test
+  public void testBinPackTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 is not 
null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
+  @Test
+  public void testSortTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(" +
+            "  table => '%s'," +
+            "  strategy => 'sort'," +
+            "  sort_order => 'c1'," +
+            "  where => 'c2 is not null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
+  @Test
+  public void testZOrderTableWithSpecialChars() {
+    
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+    List<Object[]> expectedRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files(" +
+            "  table => '%s'," +
+            "  strategy => 'sort'," +
+            "  sort_order => 'zorder(c1, c2)'," +
+            "  where => 'c2 is not null')",
+        catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+    assertEquals("Action should rewrite 10 data files and add 1 data file",
+        ImmutableList.of(row(10, 1)),
+        output);
+
+    List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    Assert.assertEquals("Table cache must be empty", 0, 
SparkTableCache.get().size());
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
@@ -385,6 +471,10 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   }
 
   private void insertData(int filesCount) {
+    insertData(tableName, filesCount);
+  }
+
+  private void insertData(String table, int filesCount) {
     ThreeColumnRecord record1 = new ThreeColumnRecord(1, "foo", null);
     ThreeColumnRecord record2 = new ThreeColumnRecord(2, "bar", null);
 
@@ -396,13 +486,17 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).repartition(filesCount);
     try {
-      df.writeTo(tableName).append();
+      df.writeTo(table).append();
     } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
       throw new RuntimeException(e);
     }
   }
 
   private List<Object[]> currentData() {
-    return rowsToJava(spark.sql("SELECT * FROM " + tableName + " order by c1, 
c2, c3").collectAsList());
+    return currentData(tableName);
+  }
+
+  private List<Object[]> currentData(String table) {
+    return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, 
c3").collectAsList());
   }
 }
diff --git 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 17f9e23862..8c205037f5 100644
--- 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,7 +74,6 @@ public class IcebergSortCompactionBenchmark {
   private static final String[] NAMESPACE = new String[] {"default"};
   private static final String NAME = "sortbench";
   private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
-  private static final String FULL_IDENT = 
Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
   private static final int NUM_FILES = 8;
   private static final long NUM_ROWS = 7500000L;
   private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -107,7 +106,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -120,7 +119,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -134,7 +133,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -150,7 +149,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -166,7 +165,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortString() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -179,7 +178,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -195,7 +194,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -213,7 +212,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol")
         .execute();
@@ -223,7 +222,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2")
         .execute();
@@ -233,7 +232,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3")
         .execute();
@@ -243,7 +242,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3", "intCol4")
         .execute();
@@ -253,7 +252,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortString() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol")
         .execute();
@@ -263,7 +262,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "doubleCol")
         .execute();
@@ -273,7 +272,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table(), FULL_IDENT)
+        .rewriteDataFiles(table())
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", 
"longCol")
         .execute();
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
new file mode 100644
index 0000000000..dda86d0b76
--- /dev/null
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An internal table catalog that is capable of loading tables from a cache.
+ */
+public class SparkCachedTableCatalog implements TableCatalog {
+
+  private static final String CLASS_NAME = 
SparkCachedTableCatalog.class.getName();
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
+  private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+  private String name = null;
+
+  @Override
+  public Identifier[] listTables(String[] namespace) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
listing tables");
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+    Pair<Table, Long> table = load(ident);
+    return new SparkTable(table.first(), table.second(), false /* refresh 
eagerly */);
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
+    Pair<Table, Long> table = load(ident);
+    Preconditions.checkArgument(table.second() == null,
+        "Cannot time travel based on both table identifier and AS OF");
+    return new SparkTable(table.first(), Long.parseLong(version), false /* 
refresh eagerly */);
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident, long timestampMicros) throws 
NoSuchTableException {
+    Pair<Table, Long> table = load(ident);
+    Preconditions.checkArgument(table.second() == null,
+        "Cannot time travel based on both table identifier and AS OF");
+    // Spark passes microseconds but Iceberg uses milliseconds for snapshots
+    long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestampMicros);
+    long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table.first(), 
timestampMillis);
+    return new SparkTable(table.first(), snapshotId, false /* refresh eagerly 
*/);
+  }
+
+  @Override
+  public void invalidateTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
table invalidation");
+  }
+
+  @Override
+  public SparkTable createTable(Identifier ident, StructType schema, 
Transform[] partitions,
+                                Map<String, String> properties) throws 
TableAlreadyExistsException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
creating tables");
+  }
+
+  @Override
+  public SparkTable alterTable(Identifier ident, TableChange... changes) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
altering tables");
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
dropping tables");
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) throws 
UnsupportedOperationException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
purging tables");
+  }
+
+  @Override
+  public void renameTable(Identifier oldIdent, Identifier newIdent) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
renaming tables");
+  }
+
+  @Override
+  public void initialize(String catalogName, CaseInsensitiveStringMap options) 
{
+    this.name = catalogName;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException 
{
+    Preconditions.checkArgument(ident.namespace().length == 0, CLASS_NAME + " 
does not support namespaces");
+
+    Pair<String, List<String>> parsedIdent = parseIdent(ident);
+    String key = parsedIdent.first();
+    List<String> metadata = parsedIdent.second();
+
+    Long asOfTimestamp = null;
+    Long snapshotId = null;
+    for (String meta : metadata) {
+      Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
+      if (timeBasedMatcher.matches()) {
+        asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
+        continue;
+      }
+
+      Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
+      if (snapshotBasedMatcher.matches()) {
+        snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
+      }
+    }
+
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot and timestamp for time travel: %s", 
ident);
+
+    Table table = TABLE_CACHE.get(key);
+
+    if (table == null) {
+      throw new NoSuchTableException(ident);
+    }
+
+    if (snapshotId != null) {
+      return Pair.of(table, snapshotId);
+    } else if (asOfTimestamp != null) {
+      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, 
asOfTimestamp));
+    } else {
+      return Pair.of(table, null);
+    }
+  }
+
+  private Pair<String, List<String>> parseIdent(Identifier ident) {
+    int hashIndex = ident.name().lastIndexOf('#');
+    if (hashIndex != -1 && !ident.name().endsWith("#")) {
+      String key = ident.name().substring(0, hashIndex);
+      List<String> metadata = 
COMMA.splitToList(ident.name().substring(hashIndex + 1));
+      return Pair.of(key, metadata);
+    } else {
+      return Pair.of(ident.name(), ImmutableList.of());
+    }
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
new file mode 100644
index 0000000000..ec587c529f
--- /dev/null
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SparkTableCache {
+
+  private static final SparkTableCache INSTANCE = new SparkTableCache();
+
+  private final Map<String, Table> cache = Maps.newConcurrentMap();
+
+  public static SparkTableCache get() {
+    return INSTANCE;
+  }
+
+  public int size() {
+    return cache.size();
+  }
+
+  public void add(String key, Table table) {
+    cache.put(key, table);
+  }
+
+  public boolean contains(String key) {
+    return cache.containsKey(key);
+  }
+
+  public Table get(String key) {
+    return cache.get(key);
+  }
+
+  public Table remove(String key) {
+    return cache.remove(key);
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 5ab2539c2c..d1937fcbcd 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,7 +87,6 @@ public class BaseRewriteDataFilesSparkAction
   );
 
   private final Table table;
-  private final String fullIdentifier;
 
   private Expression filter = Expressions.alwaysTrue();
   private int maxConcurrentFileGroupRewrites;
@@ -97,17 +96,9 @@ public class BaseRewriteDataFilesSparkAction
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
-  @Deprecated
   protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
-    this.fullIdentifier = null;
-  }
-
-  protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table, 
String fullIdentifier) {
-    super(spark);
-    this.table = table;
-    this.fullIdentifier = fullIdentifier;
   }
 
   @Override
@@ -439,7 +430,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   private BinPackStrategy binPackStrategy() {
-    return new SparkBinPackStrategy(table, fullIdentifier, spark());
+    return new SparkBinPackStrategy(table, spark());
   }
 
   private SortStrategy sortStrategy() {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 4f6bb3f00d..d77a13d128 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,16 +72,10 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  @Deprecated
   public RewriteDataFiles rewriteDataFiles(Table table) {
     return new BaseRewriteDataFilesSparkAction(spark, table);
   }
 
-  @Override
-  public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) 
{
-    return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
-  }
-
   @Override
   public DeleteOrphanFiles deleteOrphanFiles(Table table) {
     return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index fcfaeaa751..d8c1cc3610 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.actions.BinPackStrategy;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -38,24 +39,14 @@ import org.apache.spark.sql.internal.SQLConf;
 
 public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
-  private final String fullIdentifier;
   private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
   private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
-  public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession 
spark) {
-    this.table = table;
-    this.spark = spark;
-    // Fallback if a quoted identifier is not supplied
-    this.fullIdentifier = fullIdentifier == null ? table.name() : 
fullIdentifier;
-  }
-
-  @Deprecated
   public SparkBinPackStrategy(Table table, SparkSession spark) {
     this.table = table;
     this.spark = spark;
-    // Fallback if a quoted identifier is not supplied
-    this.fullIdentifier = table.name();
   }
 
   @Override
@@ -67,6 +58,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
   public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
     String groupID = UUID.randomUUID().toString();
     try {
+      tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -77,7 +69,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
           .option(SparkReadOptions.SPLIT_SIZE, 
splitSize(inputFileSize(filesToRewrite)))
           .option(SparkReadOptions.FILE_OPEN_COST, "0")
-          .load(fullIdentifier);
+          .load(groupID);
 
       // All files within a file group are written with the same spec, so 
check the first
       boolean requiresRepartition = 
!filesToRewrite.get(0).spec().equals(table.spec());
@@ -93,10 +85,11 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
           .mode("append")
-          .save(fullIdentifier);
+          .save(groupID);
 
       return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
+      tableCache.remove(groupID);
       manager.removeTasks(table, groupID);
       rewriteCoordinator.clearRewrite(table, groupID);
     }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 6c8f8c027d..97f46d7938 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SortOrderUtil;
@@ -63,6 +64,7 @@ public class SparkSortStrategy extends SortStrategy {
 
   private final Table table;
   private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
   private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
@@ -114,6 +116,7 @@ public class SparkSortStrategy extends SortStrategy {
     Distribution distribution = Distributions.ordered(ordering);
 
     try {
+      tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -126,7 +129,7 @@ public class SparkSortStrategy extends SortStrategy {
 
       Dataset<Row> scanDF = cloneSession.read().format("iceberg")
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
-          .load(table.name());
+          .load(groupID);
 
       // write the packed data into new files where each split becomes a new 
file
       SQLConf sqlConf = cloneSession.sessionState().conf();
@@ -139,10 +142,11 @@ public class SparkSortStrategy extends SortStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
           .mode("append") // This will only write files without modifying the 
table, see SparkWrite.RewriteFiles
-          .save(table.name());
+          .save(groupID);
 
       return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
+      tableCache.remove(groupID);
       manager.removeTasks(table, groupID);
       rewriteCoordinator.clearRewrite(table, groupID);
     }
@@ -160,6 +164,10 @@ public class SparkSortStrategy extends SortStrategy {
     return sizeEstimateMultiple;
   }
 
+  protected SparkTableCache tableCache() {
+    return tableCache;
+  }
+
   protected FileScanTaskSetManager manager() {
     return manager;
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index fef23acd9b..8044039a59 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -178,6 +178,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
     Distribution distribution = Distributions.ordered(ordering);
 
     try {
+      tableCache().add(groupID, table());
       manager().stageTasks(table(), groupID, filesToRewrite);
 
       // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
@@ -190,7 +191,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
 
       Dataset<Row> scanDF = cloneSession.read().format("iceberg")
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
-          .load(table().name());
+          .load(groupID);
 
       Column[] originalColumns = Arrays.stream(scanDF.schema().names())
           .map(n -> functions.col(n))
@@ -217,10 +218,11 @@ public class SparkZOrderStrategy extends 
SparkSortStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
           .mode("append")
-          .save(table().name());
+          .save(groupID);
 
       return rewriteCoordinator().fetchNewDataFiles(table(), groupID);
     } finally {
+      tableCache().remove(groupID);
       manager().removeTasks(table(), groupID);
       rewriteCoordinator().clearRewrite(table(), groupID);
     }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 27ffc69086..5c71dbf555 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -48,7 +48,7 @@ import scala.runtime.BoxedUnit;
 /**
  * A procedure that rewrites datafiles in a table.
  *
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table, 
String)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
  */
 class RewriteDataFilesProcedure extends BaseProcedure {
 
@@ -95,7 +95,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
 
     return modifyIcebergTable(tableIdent, table -> {
       String quotedFullIdentifier = 
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
-      RewriteDataFiles action = actions().rewriteDataFiles(table, 
quotedFullIdentifier);
+      RewriteDataFiles action = actions().rewriteDataFiles(table);
 
       String strategy = args.isNullAt(1) ? null : args.getString(1);
       String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 73429dde92..a3978b6967 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -26,8 +26,11 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.PathIdentifier;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCachedTableCatalog;
+import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -59,9 +62,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  */
 public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions {
   private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
+  private static final String DEFAULT_CACHE_CATALOG_NAME = 
"default_cache_iceberg";
   private static final String DEFAULT_CATALOG = "spark.sql.catalog." + 
DEFAULT_CATALOG_NAME;
+  private static final String DEFAULT_CACHE_CATALOG = "spark.sql.catalog." + 
DEFAULT_CACHE_CATALOG_NAME;
   private static final String AT_TIMESTAMP = "at_timestamp_";
   private static final String SNAPSHOT_ID = "snapshot_id_";
+  private static final String[] EMPTY_NAMESPACE = new String[0];
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
 
   @Override
   public String shortName() {
@@ -105,7 +113,7 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
   private Spark3Util.CatalogAndIdentifier 
catalogAndIdentifier(CaseInsensitiveStringMap options) {
     Preconditions.checkArgument(options.containsKey("path"), "Cannot open 
table: path is not set");
     SparkSession spark = SparkSession.active();
-    setupDefaultSparkCatalog(spark);
+    setupDefaultSparkCatalogs(spark);
     String path = options.get("path");
 
     Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
@@ -125,11 +133,14 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
 
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
-    if (path.contains("/")) {
+    if (TABLE_CACHE.contains(path)) {
+      return new Spark3Util.CatalogAndIdentifier(
+          catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
+          Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
+    } else if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
-      String newPath = (selector == null) ? path : path + "#" + selector;
       return new 
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          new PathIdentifier(newPath));
+          new PathIdentifier(pathWithSelector(path, selector)));
     }
 
     final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = 
Spark3Util.catalogAndIdentifier(
@@ -145,6 +156,10 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
     }
   }
 
+  private String pathWithSelector(String path, String selector) {
+    return (selector == null) ? path : path + "#" + selector;
+  }
+
   private Identifier identifierWithSelector(Identifier ident, String selector) 
{
     if (selector == null) {
       return ident;
@@ -185,17 +200,19 @@ public class IcebergSource implements DataSourceRegister, 
SupportsCatalogOptions
     return null;
   }
 
-  private static void setupDefaultSparkCatalog(SparkSession spark) {
-    if (spark.conf().contains(DEFAULT_CATALOG)) {
-      return;
+  private static void setupDefaultSparkCatalogs(SparkSession spark) {
+    if (!spark.conf().contains(DEFAULT_CATALOG)) {
+      ImmutableMap<String, String> config = ImmutableMap.of(
+          "type", "hive",
+          "default-namespace", "default",
+          "cache-enabled", "false" // the source should not use a cache
+      );
+      spark.conf().set(DEFAULT_CATALOG, SparkCatalog.class.getName());
+      config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + 
key, value));
+    }
+
+    if (!spark.conf().contains(DEFAULT_CACHE_CATALOG)) {
+      spark.conf().set(DEFAULT_CACHE_CATALOG, 
SparkCachedTableCatalog.class.getName());
     }
-    ImmutableMap<String, String> config = ImmutableMap.of(
-        "type", "hive",
-        "default-namespace", "default",
-        "cache-enabled", "false" // the source should not use a cache
-    );
-    String catalogName = "org.apache.iceberg.spark.SparkCatalog";
-    spark.conf().set(DEFAULT_CATALOG, catalogName);
-    config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + 
key, value));
   }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
new file mode 100644
index 0000000000..817af03029
--- /dev/null
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkCachedTableCatalog extends SparkTestBaseWithCatalog {
+
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+  @BeforeClass
+  public static void setupCachedTableCatalog() {
+    spark.conf().set("spark.sql.catalog.testcache", 
SparkCachedTableCatalog.class.getName());
+  }
+
+  @AfterClass
+  public static void unsetCachedTableCatalog() {
+    spark.conf().unset("spark.sql.catalog.testcache");
+  }
+
+  public TestSparkCachedTableCatalog() {
+    super(SparkCatalogConfig.HIVE);
+  }
+
+  @Test
+  public void testTimeTravel() {
+    sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+    table.refresh();
+    Snapshot firstSnapshot = table.currentSnapshot();
+    waitUntilAfter(firstSnapshot.timestampMillis());
+
+    sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
+
+    table.refresh();
+    Snapshot secondSnapshot = table.currentSnapshot();
+    waitUntilAfter(secondSnapshot.timestampMillis());
+
+    sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
+
+    table.refresh();
+
+    try {
+      TABLE_CACHE.add("key", table);
+
+      assertEquals("Should have expected rows in 3rd snapshot",
+          ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
+          sql("SELECT * FROM testcache.key ORDER BY id"));
+
+      assertEquals("Should have expected rows in 2nd snapshot",
+          ImmutableList.of(row(1, "hr"), row(2, "hr")),
+          sql("SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id", 
secondSnapshot.timestampMillis()));
+
+      assertEquals("Should have expected rows in 1st snapshot",
+          ImmutableList.of(row(1, "hr")),
+          sql("SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id", 
firstSnapshot.snapshotId()));
+
+    } finally {
+      TABLE_CACHE.remove("key");
+    }
+  }
+}
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index ea2a7755f2..e0b1e53cfe 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   private RewriteDataFiles basicRewrite(Table table) {
     // Always compact regardless of input files
     table.refresh();
-    return actions().rewriteDataFiles(table, 
tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+    return 
actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
   }
 
   @Test
@@ -259,7 +259,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table, tableLocation)
+    Result result = actions().rewriteDataFiles(table)
         // do not include any file based on bin pack file size configs
         .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
         .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
@@ -293,7 +293,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table, tableLocation)
+    Result result = actions().rewriteDataFiles(table)
         .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
         .execute();
     Assert.assertEquals("Action should rewrite 1 data files", 1, 
result.rewrittenDataFilesCount());
@@ -1123,17 +1123,17 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Table table = createTable(1);
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than 
once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table, 
tableLocation).binPack().sort());
+        "Cannot set strategy", () -> 
actions().rewriteDataFiles(table).binPack().sort());
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than 
once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table, 
tableLocation).sort().binPack());
+        "Cannot set strategy", () -> 
actions().rewriteDataFiles(table).sort().binPack());
 
     AssertHelpers.assertThrows(
         "Should be unable to set Strategy more than once",
         IllegalArgumentException.class,
         "Cannot set strategy",
         () ->
-            actions().rewriteDataFiles(table, 
tableLocation).sort(SortOrder.unsorted()).binPack());
+            
actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
   }
 
   @Test

Reply via email to