This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6cc4a198c5 Spark: Add Spark catalog for loading tables from cache
(#5247)
6cc4a198c5 is described below
commit 6cc4a198c56c05ff103e6ecdf75fe50004af19da
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Jul 11 17:12:59 2022 -0700
Spark: Add Spark catalog for loading tables from cache (#5247)
---
.../apache/iceberg/actions/ActionsProvider.java | 11 --
.../extensions/TestRewriteDataFilesProcedure.java | 98 +++++++++++-
.../action/IcebergSortCompactionBenchmark.java | 29 ++--
.../iceberg/spark/SparkCachedTableCatalog.java | 158 ++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableCache.java | 55 +++++++
.../actions/BaseRewriteDataFilesSparkAction.java | 11 +-
.../apache/iceberg/spark/actions/SparkActions.java | 6 -
.../spark/actions/SparkBinPackStrategy.java | 19 +--
.../iceberg/spark/actions/SparkSortStrategy.java | 12 +-
.../iceberg/spark/actions/SparkZOrderStrategy.java | 6 +-
.../procedures/RewriteDataFilesProcedure.java | 4 +-
.../apache/iceberg/spark/source/IcebergSource.java | 47 ++++--
.../iceberg/spark/TestSparkCachedTableCatalog.java | 88 ++++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 12 +-
.../extensions/TestRewriteDataFilesProcedure.java | 98 +++++++++++-
.../action/IcebergSortCompactionBenchmark.java | 29 ++--
.../iceberg/spark/SparkCachedTableCatalog.java | 178 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableCache.java | 55 +++++++
.../actions/BaseRewriteDataFilesSparkAction.java | 11 +-
.../apache/iceberg/spark/actions/SparkActions.java | 6 -
.../spark/actions/SparkBinPackStrategy.java | 19 +--
.../iceberg/spark/actions/SparkSortStrategy.java | 12 +-
.../iceberg/spark/actions/SparkZOrderStrategy.java | 6 +-
.../procedures/RewriteDataFilesProcedure.java | 4 +-
.../apache/iceberg/spark/source/IcebergSource.java | 47 ++++--
.../iceberg/spark/TestSparkCachedTableCatalog.java | 88 ++++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 12 +-
27 files changed, 964 insertions(+), 157 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index 2a73809d60..f2564ddb70 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -56,22 +56,11 @@ public interface ActionsProvider {
/**
* Instantiates an action to rewrite data files.
- * @deprecated please use {@link #rewriteDataFiles(Table, String)}
*/
- @Deprecated
default RewriteDataFiles rewriteDataFiles(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement rewriteDataFiles");
}
- /**
- * Instantiates an action to rewrite data files.
- */
- default RewriteDataFiles rewriteDataFiles(Table table, String
fullIdentifier) {
- throw new UnsupportedOperationException(
- this.getClass().getName() + " does not implement
rewriteDataFiles(Table, String)"
- );
- }
-
/**
* Instantiates an action to expire snapshots.
*/
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 63a63ddccb..9fdb0b1ed5 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -36,11 +38,14 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+ private static final String QUOTED_SPECIAL_CHARS_TABLE_NAME =
"`table:with.special:chars`";
+
public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
@@ -48,6 +53,7 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
@After
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
}
@Test
@@ -376,6 +382,86 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
() -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
}
+ @Test
+ public void testBinPackTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 is not
null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
+ @Test
+ public void testSortTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(" +
+ " table => '%s'," +
+ " strategy => 'sort'," +
+ " sort_order => 'c1'," +
+ " where => 'c2 is not null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
+ @Test
+ public void testZOrderTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(" +
+ " table => '%s'," +
+ " strategy => 'sort'," +
+ " sort_order => 'zorder(c1, c2)'," +
+ " where => 'c2 is not null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
@@ -385,6 +471,10 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
}
private void insertData(int filesCount) {
+ insertData(tableName, filesCount);
+ }
+
+ private void insertData(String table, int filesCount) {
ThreeColumnRecord record1 = new ThreeColumnRecord(1, "foo", null);
ThreeColumnRecord record2 = new ThreeColumnRecord(2, "bar", null);
@@ -396,13 +486,17 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).repartition(filesCount);
try {
- df.writeTo(tableName).append();
+ df.writeTo(table).append();
} catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
throw new RuntimeException(e);
}
}
private List<Object[]> currentData() {
- return rowsToJava(spark.sql("SELECT * FROM " + tableName + " order by c1,
c2, c3").collectAsList());
+ return currentData(tableName);
+ }
+
+ private List<Object[]> currentData(String table) {
+ return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
}
diff --git
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 17f9e23862..8c205037f5 100644
---
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,7 +74,6 @@ public class IcebergSortCompactionBenchmark {
private static final String[] NAMESPACE = new String[] {"default"};
private static final String NAME = "sortbench";
private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
- private static final String FULL_IDENT =
Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
private static final int NUM_FILES = 8;
private static final long NUM_ROWS = 7500000L;
private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -107,7 +106,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -120,7 +119,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt2() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -134,7 +133,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt3() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -150,7 +149,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt4() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -166,7 +165,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortString() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -179,7 +178,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -195,7 +194,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -213,7 +212,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol")
.execute();
@@ -223,7 +222,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt2() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2")
.execute();
@@ -233,7 +232,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt3() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3")
.execute();
@@ -243,7 +242,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt4() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3", "intCol4")
.execute();
@@ -253,7 +252,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortString() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol")
.execute();
@@ -263,7 +262,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "doubleCol")
.execute();
@@ -273,7 +272,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol",
"longCol")
.execute();
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
new file mode 100644
index 0000000000..ff4081764b
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An internal table catalog that is capable of loading tables from a cache.
+ */
+public class SparkCachedTableCatalog implements TableCatalog {
+
+ private static final String CLASS_NAME =
SparkCachedTableCatalog.class.getName();
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
+ private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+ private String name = null;
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
listing tables");
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+ Pair<Table, Long> table = load(ident);
+ return new SparkTable(table.first(), table.second(), false /* refresh
eagerly */);
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
table invalidation");
+ }
+
+ @Override
+ public SparkTable createTable(Identifier ident, StructType schema,
Transform[] partitions,
+ Map<String, String> properties) throws
TableAlreadyExistsException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
creating tables");
+ }
+
+ @Override
+ public SparkTable alterTable(Identifier ident, TableChange... changes) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
altering tables");
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
dropping tables");
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
purging tables");
+ }
+
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
renaming tables");
+ }
+
+ @Override
+ public void initialize(String catalogName, CaseInsensitiveStringMap options)
{
+ this.name = catalogName;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException
{
+ Preconditions.checkArgument(ident.namespace().length == 0, CLASS_NAME + "
does not support namespaces");
+
+ Pair<String, List<String>> parsedIdent = parseIdent(ident);
+ String key = parsedIdent.first();
+ List<String> metadata = parsedIdent.second();
+
+ Long asOfTimestamp = null;
+ Long snapshotId = null;
+ for (String meta : metadata) {
+ Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
+ if (timeBasedMatcher.matches()) {
+ asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
+ continue;
+ }
+
+ Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
+ if (snapshotBasedMatcher.matches()) {
+ snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
+ }
+ }
+
+ Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+ "Cannot specify both snapshot and timestamp for time travel: %s",
ident);
+
+ Table table = TABLE_CACHE.get(key);
+
+ if (table == null) {
+ throw new NoSuchTableException(ident);
+ }
+
+ if (snapshotId != null) {
+ return Pair.of(table, snapshotId);
+ } else if (asOfTimestamp != null) {
+ return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table,
asOfTimestamp));
+ } else {
+ return Pair.of(table, null);
+ }
+ }
+
+ private Pair<String, List<String>> parseIdent(Identifier ident) {
+ int hashIndex = ident.name().lastIndexOf('#');
+ if (hashIndex != -1 && !ident.name().endsWith("#")) {
+ String key = ident.name().substring(0, hashIndex);
+ List<String> metadata =
COMMA.splitToList(ident.name().substring(hashIndex + 1));
+ return Pair.of(key, metadata);
+ } else {
+ return Pair.of(ident.name(), ImmutableList.of());
+ }
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
new file mode 100644
index 0000000000..ec587c529f
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SparkTableCache {
+
+ private static final SparkTableCache INSTANCE = new SparkTableCache();
+
+ private final Map<String, Table> cache = Maps.newConcurrentMap();
+
+ public static SparkTableCache get() {
+ return INSTANCE;
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ public void add(String key, Table table) {
+ cache.put(key, table);
+ }
+
+ public boolean contains(String key) {
+ return cache.containsKey(key);
+ }
+
+ public Table get(String key) {
+ return cache.get(key);
+ }
+
+ public Table remove(String key) {
+ return cache.remove(key);
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 5ab2539c2c..d1937fcbcd 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,7 +87,6 @@ public class BaseRewriteDataFilesSparkAction
);
private final Table table;
- private final String fullIdentifier;
private Expression filter = Expressions.alwaysTrue();
private int maxConcurrentFileGroupRewrites;
@@ -97,17 +96,9 @@ public class BaseRewriteDataFilesSparkAction
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
- @Deprecated
protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
- this.fullIdentifier = null;
- }
-
- protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table,
String fullIdentifier) {
- super(spark);
- this.table = table;
- this.fullIdentifier = fullIdentifier;
}
@Override
@@ -439,7 +430,7 @@ public class BaseRewriteDataFilesSparkAction
}
private BinPackStrategy binPackStrategy() {
- return new SparkBinPackStrategy(table, fullIdentifier, spark());
+ return new SparkBinPackStrategy(table, spark());
}
private SortStrategy sortStrategy() {
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 4f6bb3f00d..d77a13d128 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,16 +72,10 @@ public class SparkActions implements ActionsProvider {
}
@Override
- @Deprecated
public RewriteDataFiles rewriteDataFiles(Table table) {
return new BaseRewriteDataFilesSparkAction(spark, table);
}
- @Override
- public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier)
{
- return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
- }
-
@Override
public DeleteOrphanFiles deleteOrphanFiles(Table table) {
return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index fcfaeaa751..d8c1cc3610 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.actions.BinPackStrategy;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -38,24 +39,14 @@ import org.apache.spark.sql.internal.SQLConf;
public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
- private final String fullIdentifier;
private final SparkSession spark;
+ private final SparkTableCache tableCache = SparkTableCache.get();
private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
- public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession
spark) {
- this.table = table;
- this.spark = spark;
- // Fallback if a quoted identifier is not supplied
- this.fullIdentifier = fullIdentifier == null ? table.name() :
fullIdentifier;
- }
-
- @Deprecated
public SparkBinPackStrategy(Table table, SparkSession spark) {
this.table = table;
this.spark = spark;
- // Fallback if a quoted identifier is not supplied
- this.fullIdentifier = table.name();
}
@Override
@@ -67,6 +58,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
String groupID = UUID.randomUUID().toString();
try {
+ tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -77,7 +69,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkReadOptions.SPLIT_SIZE,
splitSize(inputFileSize(filesToRewrite)))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .load(fullIdentifier);
+ .load(groupID);
// All files within a file group are written with the same spec, so
check the first
boolean requiresRepartition =
!filesToRewrite.get(0).spec().equals(table.spec());
@@ -93,10 +85,11 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
.mode("append")
- .save(fullIdentifier);
+ .save(groupID);
return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
+ tableCache.remove(groupID);
manager.removeTasks(table, groupID);
rewriteCoordinator.clearRewrite(table, groupID);
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 6c8f8c027d..97f46d7938 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
@@ -63,6 +64,7 @@ public class SparkSortStrategy extends SortStrategy {
private final Table table;
private final SparkSession spark;
+ private final SparkTableCache tableCache = SparkTableCache.get();
private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
@@ -114,6 +116,7 @@ public class SparkSortStrategy extends SortStrategy {
Distribution distribution = Distributions.ordered(ordering);
try {
+ tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -126,7 +129,7 @@ public class SparkSortStrategy extends SortStrategy {
Dataset<Row> scanDF = cloneSession.read().format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
- .load(table.name());
+ .load(groupID);
// write the packed data into new files where each split becomes a new
file
SQLConf sqlConf = cloneSession.sessionState().conf();
@@ -139,10 +142,11 @@ public class SparkSortStrategy extends SortStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
.mode("append") // This will only write files without modifying the
table, see SparkWrite.RewriteFiles
- .save(table.name());
+ .save(groupID);
return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
+ tableCache.remove(groupID);
manager.removeTasks(table, groupID);
rewriteCoordinator.clearRewrite(table, groupID);
}
@@ -160,6 +164,10 @@ public class SparkSortStrategy extends SortStrategy {
return sizeEstimateMultiple;
}
+ protected SparkTableCache tableCache() {
+ return tableCache;
+ }
+
protected FileScanTaskSetManager manager() {
return manager;
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index fef23acd9b..8044039a59 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -178,6 +178,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
Distribution distribution = Distributions.ordered(ordering);
try {
+ tableCache().add(groupID, table());
manager().stageTasks(table(), groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -190,7 +191,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
Dataset<Row> scanDF = cloneSession.read().format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
- .load(table().name());
+ .load(groupID);
Column[] originalColumns = Arrays.stream(scanDF.schema().names())
.map(n -> functions.col(n))
@@ -217,10 +218,11 @@ public class SparkZOrderStrategy extends
SparkSortStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
.mode("append")
- .save(table().name());
+ .save(groupID);
return rewriteCoordinator().fetchNewDataFiles(table(), groupID);
} finally {
+ tableCache().remove(groupID);
manager().removeTasks(table(), groupID);
rewriteCoordinator().clearRewrite(table(), groupID);
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 27ffc69086..5c71dbf555 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -48,7 +48,7 @@ import scala.runtime.BoxedUnit;
/**
* A procedure that rewrites datafiles in a table.
*
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table,
String)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
*/
class RewriteDataFilesProcedure extends BaseProcedure {
@@ -95,7 +95,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
return modifyIcebergTable(tableIdent, table -> {
String quotedFullIdentifier =
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
- RewriteDataFiles action = actions().rewriteDataFiles(table,
quotedFullIdentifier);
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
String strategy = args.isNullAt(1) ? null : args.getString(1);
String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 7aa66b2223..d70d902c1f 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -25,8 +25,11 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCachedTableCatalog;
+import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -57,9 +60,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
*/
public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
+ private static final String DEFAULT_CACHE_CATALOG_NAME =
"default_cache_iceberg";
+ private static final String DEFAULT_CACHE_CATALOG = "spark.sql.catalog." +
DEFAULT_CACHE_CATALOG_NAME;
private static final String DEFAULT_CATALOG = "spark.sql.catalog." +
DEFAULT_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
+ private static final String[] EMPTY_NAMESPACE = new String[0];
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
@Override
public String shortName() {
@@ -103,7 +111,7 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
private Spark3Util.CatalogAndIdentifier
catalogAndIdentifier(CaseInsensitiveStringMap options) {
Preconditions.checkArgument(options.containsKey("path"), "Cannot open
table: path is not set");
SparkSession spark = SparkSession.active();
- setupDefaultSparkCatalog(spark);
+ setupDefaultSparkCatalogs(spark);
String path = options.get("path");
Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
@@ -123,11 +131,14 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
CatalogManager catalogManager = spark.sessionState().catalogManager();
- if (path.contains("/")) {
+ if (TABLE_CACHE.contains(path)) {
+ return new Spark3Util.CatalogAndIdentifier(
+ catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
+ Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
+ } else if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
- String newPath = (selector == null) ? path : path + "#" + selector;
return new
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
- new PathIdentifier(newPath));
+ new PathIdentifier(pathWithSelector(path, selector)));
}
final Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
Spark3Util.catalogAndIdentifier(
@@ -143,6 +154,10 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
}
}
+ private String pathWithSelector(String path, String selector) {
+ return (selector == null) ? path : path + "#" + selector;
+ }
+
private Identifier identifierWithSelector(Identifier ident, String selector)
{
if (selector == null) {
return ident;
@@ -173,17 +188,19 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
return null;
}
- private static void setupDefaultSparkCatalog(SparkSession spark) {
- if (spark.conf().contains(DEFAULT_CATALOG)) {
- return;
+ private static void setupDefaultSparkCatalogs(SparkSession spark) {
+ if (!spark.conf().contains(DEFAULT_CATALOG)) {
+ ImmutableMap<String, String> config = ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "cache-enabled", "false" // the source should not use a cache
+ );
+ spark.conf().set(DEFAULT_CATALOG, SparkCatalog.class.getName());
+ config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." +
key, value));
+ }
+
+ if (!spark.conf().contains(DEFAULT_CACHE_CATALOG)) {
+ spark.conf().set(DEFAULT_CACHE_CATALOG,
SparkCachedTableCatalog.class.getName());
}
- ImmutableMap<String, String> config = ImmutableMap.of(
- "type", "hive",
- "default-namespace", "default",
- "cache-enabled", "false" // the source should not use a cache
- );
- String catalogName = "org.apache.iceberg.spark.SparkCatalog";
- spark.conf().set(DEFAULT_CATALOG, catalogName);
- config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." +
key, value));
}
}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
new file mode 100644
index 0000000000..817af03029
--- /dev/null
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkCachedTableCatalog extends SparkTestBaseWithCatalog {
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+ @BeforeClass
+ public static void setupCachedTableCatalog() {
+ spark.conf().set("spark.sql.catalog.testcache",
SparkCachedTableCatalog.class.getName());
+ }
+
+ @AfterClass
+ public static void unsetCachedTableCatalog() {
+ spark.conf().unset("spark.sql.catalog.testcache");
+ }
+
+ public TestSparkCachedTableCatalog() {
+ super(SparkCatalogConfig.HIVE);
+ }
+
+ @Test
+ public void testTimeTravel() {
+ sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ table.refresh();
+ Snapshot firstSnapshot = table.currentSnapshot();
+ waitUntilAfter(firstSnapshot.timestampMillis());
+
+ sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
+
+ table.refresh();
+ Snapshot secondSnapshot = table.currentSnapshot();
+ waitUntilAfter(secondSnapshot.timestampMillis());
+
+ sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
+
+ table.refresh();
+
+ try {
+ TABLE_CACHE.add("key", table);
+
+ assertEquals("Should have expected rows in 3rd snapshot",
+ ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
+ sql("SELECT * FROM testcache.key ORDER BY id"));
+
+ assertEquals("Should have expected rows in 2nd snapshot",
+ ImmutableList.of(row(1, "hr"), row(2, "hr")),
+ sql("SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id",
secondSnapshot.timestampMillis()));
+
+ assertEquals("Should have expected rows in 1st snapshot",
+ ImmutableList.of(row(1, "hr")),
+ sql("SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id",
firstSnapshot.snapshotId()));
+
+ } finally {
+ TABLE_CACHE.remove("key");
+ }
+ }
+}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index ea2a7755f2..e0b1e53cfe 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
private RewriteDataFiles basicRewrite(Table table) {
// Always compact regardless of input files
table.refresh();
- return actions().rewriteDataFiles(table,
tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+ return
actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
}
@Test
@@ -259,7 +259,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table, tableLocation)
+ Result result = actions().rewriteDataFiles(table)
// do not include any file based on bin pack file size configs
.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
@@ -293,7 +293,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table, tableLocation)
+ Result result = actions().rewriteDataFiles(table)
.option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
.execute();
Assert.assertEquals("Action should rewrite 1 data files", 1,
result.rewrittenDataFilesCount());
@@ -1123,17 +1123,17 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Table table = createTable(1);
AssertHelpers.assertThrows("Should be unable to set Strategy more than
once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table,
tableLocation).binPack().sort());
+ "Cannot set strategy", () ->
actions().rewriteDataFiles(table).binPack().sort());
AssertHelpers.assertThrows("Should be unable to set Strategy more than
once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table,
tableLocation).sort().binPack());
+ "Cannot set strategy", () ->
actions().rewriteDataFiles(table).sort().binPack());
AssertHelpers.assertThrows(
"Should be unable to set Strategy more than once",
IllegalArgumentException.class,
"Cannot set strategy",
() ->
- actions().rewriteDataFiles(table,
tableLocation).sort(SortOrder.unsorted()).binPack());
+
actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
}
@Test
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 63a63ddccb..9fdb0b1ed5 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -36,11 +38,14 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+ private static final String QUOTED_SPECIAL_CHARS_TABLE_NAME =
"`table:with.special:chars`";
+
public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
@@ -48,6 +53,7 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
@After
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
}
@Test
@@ -376,6 +382,86 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
() -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
}
+ @Test
+ public void testBinPackTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 is not
null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
+ @Test
+ public void testSortTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(" +
+ " table => '%s'," +
+ " strategy => 'sort'," +
+ " sort_order => 'c1'," +
+ " where => 'c2 is not null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
+ @Test
+ public void testZOrderTableWithSpecialChars() {
+
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
+
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
+
+ List<Object[]> expectedRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(" +
+ " table => '%s'," +
+ " strategy => 'sort'," +
+ " sort_order => 'zorder(c1, c2)'," +
+ " where => 'c2 is not null')",
+ catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+
+ assertEquals("Action should rewrite 10 data files and add 1 data file",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ Assert.assertEquals("Table cache must be empty", 0,
SparkTableCache.get().size());
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
@@ -385,6 +471,10 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
}
private void insertData(int filesCount) {
+ insertData(tableName, filesCount);
+ }
+
+ private void insertData(String table, int filesCount) {
ThreeColumnRecord record1 = new ThreeColumnRecord(1, "foo", null);
ThreeColumnRecord record2 = new ThreeColumnRecord(2, "bar", null);
@@ -396,13 +486,17 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).repartition(filesCount);
try {
- df.writeTo(tableName).append();
+ df.writeTo(table).append();
} catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
throw new RuntimeException(e);
}
}
private List<Object[]> currentData() {
- return rowsToJava(spark.sql("SELECT * FROM " + tableName + " order by c1,
c2, c3").collectAsList());
+ return currentData(tableName);
+ }
+
+ private List<Object[]> currentData(String table) {
+ return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
}
diff --git
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 17f9e23862..8c205037f5 100644
---
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,7 +74,6 @@ public class IcebergSortCompactionBenchmark {
private static final String[] NAMESPACE = new String[] {"default"};
private static final String NAME = "sortbench";
private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
- private static final String FULL_IDENT =
Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
private static final int NUM_FILES = 8;
private static final long NUM_ROWS = 7500000L;
private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -107,7 +106,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -120,7 +119,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt2() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -134,7 +133,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt3() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -150,7 +149,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt4() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -166,7 +165,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortString() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -179,7 +178,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -195,7 +194,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -213,7 +212,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol")
.execute();
@@ -223,7 +222,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt2() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2")
.execute();
@@ -233,7 +232,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt3() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3")
.execute();
@@ -243,7 +242,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt4() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3", "intCol4")
.execute();
@@ -253,7 +252,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortString() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol")
.execute();
@@ -263,7 +262,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "doubleCol")
.execute();
@@ -273,7 +272,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table(), FULL_IDENT)
+ .rewriteDataFiles(table())
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol",
"longCol")
.execute();
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
new file mode 100644
index 0000000000..dda86d0b76
--- /dev/null
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An internal table catalog that is capable of loading tables from a cache.
+ */
+public class SparkCachedTableCatalog implements TableCatalog {
+
+ private static final String CLASS_NAME =
SparkCachedTableCatalog.class.getName();
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
+ private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+ private String name = null;
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
listing tables");
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+ Pair<Table, Long> table = load(ident);
+ return new SparkTable(table.first(), table.second(), false /* refresh
eagerly */);
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident, String version) throws
NoSuchTableException {
+ Pair<Table, Long> table = load(ident);
+ Preconditions.checkArgument(table.second() == null,
+ "Cannot time travel based on both table identifier and AS OF");
+ return new SparkTable(table.first(), Long.parseLong(version), false /*
refresh eagerly */);
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident, long timestampMicros) throws
NoSuchTableException {
+ Pair<Table, Long> table = load(ident);
+ Preconditions.checkArgument(table.second() == null,
+ "Cannot time travel based on both table identifier and AS OF");
+ // Spark passes microseconds but Iceberg uses milliseconds for snapshots
+ long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestampMicros);
+ long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table.first(),
timestampMillis);
+ return new SparkTable(table.first(), snapshotId, false /* refresh eagerly
*/);
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
table invalidation");
+ }
+
+ @Override
+ public SparkTable createTable(Identifier ident, StructType schema,
Transform[] partitions,
+ Map<String, String> properties) throws
TableAlreadyExistsException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
creating tables");
+ }
+
+ @Override
+ public SparkTable alterTable(Identifier ident, TableChange... changes) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
altering tables");
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
dropping tables");
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
purging tables");
+ }
+
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
renaming tables");
+ }
+
+ @Override
+ public void initialize(String catalogName, CaseInsensitiveStringMap options)
{
+ this.name = catalogName;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException
{
+ Preconditions.checkArgument(ident.namespace().length == 0, CLASS_NAME + "
does not support namespaces");
+
+ Pair<String, List<String>> parsedIdent = parseIdent(ident);
+ String key = parsedIdent.first();
+ List<String> metadata = parsedIdent.second();
+
+ Long asOfTimestamp = null;
+ Long snapshotId = null;
+ for (String meta : metadata) {
+ Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
+ if (timeBasedMatcher.matches()) {
+ asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
+ continue;
+ }
+
+ Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
+ if (snapshotBasedMatcher.matches()) {
+ snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
+ }
+ }
+
+ Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+ "Cannot specify both snapshot and timestamp for time travel: %s",
ident);
+
+ Table table = TABLE_CACHE.get(key);
+
+ if (table == null) {
+ throw new NoSuchTableException(ident);
+ }
+
+ if (snapshotId != null) {
+ return Pair.of(table, snapshotId);
+ } else if (asOfTimestamp != null) {
+ return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table,
asOfTimestamp));
+ } else {
+ return Pair.of(table, null);
+ }
+ }
+
+ private Pair<String, List<String>> parseIdent(Identifier ident) {
+ int hashIndex = ident.name().lastIndexOf('#');
+ if (hashIndex != -1 && !ident.name().endsWith("#")) {
+ String key = ident.name().substring(0, hashIndex);
+ List<String> metadata =
COMMA.splitToList(ident.name().substring(hashIndex + 1));
+ return Pair.of(key, metadata);
+ } else {
+ return Pair.of(ident.name(), ImmutableList.of());
+ }
+ }
+}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
new file mode 100644
index 0000000000..ec587c529f
--- /dev/null
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SparkTableCache {
+
+ private static final SparkTableCache INSTANCE = new SparkTableCache();
+
+ private final Map<String, Table> cache = Maps.newConcurrentMap();
+
+ public static SparkTableCache get() {
+ return INSTANCE;
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ public void add(String key, Table table) {
+ cache.put(key, table);
+ }
+
+ public boolean contains(String key) {
+ return cache.containsKey(key);
+ }
+
+ public Table get(String key) {
+ return cache.get(key);
+ }
+
+ public Table remove(String key) {
+ return cache.remove(key);
+ }
+}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 5ab2539c2c..d1937fcbcd 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,7 +87,6 @@ public class BaseRewriteDataFilesSparkAction
);
private final Table table;
- private final String fullIdentifier;
private Expression filter = Expressions.alwaysTrue();
private int maxConcurrentFileGroupRewrites;
@@ -97,17 +96,9 @@ public class BaseRewriteDataFilesSparkAction
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
- @Deprecated
protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
- this.fullIdentifier = null;
- }
-
- protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table,
String fullIdentifier) {
- super(spark);
- this.table = table;
- this.fullIdentifier = fullIdentifier;
}
@Override
@@ -439,7 +430,7 @@ public class BaseRewriteDataFilesSparkAction
}
private BinPackStrategy binPackStrategy() {
- return new SparkBinPackStrategy(table, fullIdentifier, spark());
+ return new SparkBinPackStrategy(table, spark());
}
private SortStrategy sortStrategy() {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 4f6bb3f00d..d77a13d128 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,16 +72,10 @@ public class SparkActions implements ActionsProvider {
}
@Override
- @Deprecated
public RewriteDataFiles rewriteDataFiles(Table table) {
return new BaseRewriteDataFilesSparkAction(spark, table);
}
- @Override
- public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier)
{
- return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
- }
-
@Override
public DeleteOrphanFiles deleteOrphanFiles(Table table) {
return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index fcfaeaa751..d8c1cc3610 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.actions.BinPackStrategy;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -38,24 +39,14 @@ import org.apache.spark.sql.internal.SQLConf;
public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
- private final String fullIdentifier;
private final SparkSession spark;
+ private final SparkTableCache tableCache = SparkTableCache.get();
private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
- public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession
spark) {
- this.table = table;
- this.spark = spark;
- // Fallback if a quoted identifier is not supplied
- this.fullIdentifier = fullIdentifier == null ? table.name() :
fullIdentifier;
- }
-
- @Deprecated
public SparkBinPackStrategy(Table table, SparkSession spark) {
this.table = table;
this.spark = spark;
- // Fallback if a quoted identifier is not supplied
- this.fullIdentifier = table.name();
}
@Override
@@ -67,6 +58,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
String groupID = UUID.randomUUID().toString();
try {
+ tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -77,7 +69,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkReadOptions.SPLIT_SIZE,
splitSize(inputFileSize(filesToRewrite)))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .load(fullIdentifier);
+ .load(groupID);
// All files within a file group are written with the same spec, so
check the first
boolean requiresRepartition =
!filesToRewrite.get(0).spec().equals(table.spec());
@@ -93,10 +85,11 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
.mode("append")
- .save(fullIdentifier);
+ .save(groupID);
return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
+ tableCache.remove(groupID);
manager.removeTasks(table, groupID);
rewriteCoordinator.clearRewrite(table, groupID);
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 6c8f8c027d..97f46d7938 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
@@ -63,6 +64,7 @@ public class SparkSortStrategy extends SortStrategy {
private final Table table;
private final SparkSession spark;
+ private final SparkTableCache tableCache = SparkTableCache.get();
private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
@@ -114,6 +116,7 @@ public class SparkSortStrategy extends SortStrategy {
Distribution distribution = Distributions.ordered(ordering);
try {
+ tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -126,7 +129,7 @@ public class SparkSortStrategy extends SortStrategy {
Dataset<Row> scanDF = cloneSession.read().format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
- .load(table.name());
+ .load(groupID);
// write the packed data into new files where each split becomes a new
file
SQLConf sqlConf = cloneSession.sessionState().conf();
@@ -139,10 +142,11 @@ public class SparkSortStrategy extends SortStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
.mode("append") // This will only write files without modifying the
table, see SparkWrite.RewriteFiles
- .save(table.name());
+ .save(groupID);
return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
+ tableCache.remove(groupID);
manager.removeTasks(table, groupID);
rewriteCoordinator.clearRewrite(table, groupID);
}
@@ -160,6 +164,10 @@ public class SparkSortStrategy extends SortStrategy {
return sizeEstimateMultiple;
}
+ protected SparkTableCache tableCache() {
+ return tableCache;
+ }
+
protected FileScanTaskSetManager manager() {
return manager;
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index fef23acd9b..8044039a59 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -178,6 +178,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
Distribution distribution = Distributions.ordered(ordering);
try {
+ tableCache().add(groupID, table());
manager().stageTasks(table(), groupID, filesToRewrite);
// Disable Adaptive Query Execution as this may change the output
partitioning of our write
@@ -190,7 +191,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
Dataset<Row> scanDF = cloneSession.read().format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
- .load(table().name());
+ .load(groupID);
Column[] originalColumns = Arrays.stream(scanDF.schema().names())
.map(n -> functions.col(n))
@@ -217,10 +218,11 @@ public class SparkZOrderStrategy extends
SparkSortStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
.mode("append")
- .save(table().name());
+ .save(groupID);
return rewriteCoordinator().fetchNewDataFiles(table(), groupID);
} finally {
+ tableCache().remove(groupID);
manager().removeTasks(table(), groupID);
rewriteCoordinator().clearRewrite(table(), groupID);
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 27ffc69086..5c71dbf555 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -48,7 +48,7 @@ import scala.runtime.BoxedUnit;
/**
* A procedure that rewrites datafiles in a table.
*
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table,
String)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
*/
class RewriteDataFilesProcedure extends BaseProcedure {
@@ -95,7 +95,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
return modifyIcebergTable(tableIdent, table -> {
String quotedFullIdentifier =
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
- RewriteDataFiles action = actions().rewriteDataFiles(table,
quotedFullIdentifier);
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
String strategy = args.isNullAt(1) ? null : args.getString(1);
String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 73429dde92..a3978b6967 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -26,8 +26,11 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCachedTableCatalog;
+import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -59,9 +62,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
*/
public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
+ private static final String DEFAULT_CACHE_CATALOG_NAME =
"default_cache_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." +
DEFAULT_CATALOG_NAME;
+ private static final String DEFAULT_CACHE_CATALOG = "spark.sql.catalog." +
DEFAULT_CACHE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
+ private static final String[] EMPTY_NAMESPACE = new String[0];
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
@Override
public String shortName() {
@@ -105,7 +113,7 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
private Spark3Util.CatalogAndIdentifier
catalogAndIdentifier(CaseInsensitiveStringMap options) {
Preconditions.checkArgument(options.containsKey("path"), "Cannot open
table: path is not set");
SparkSession spark = SparkSession.active();
- setupDefaultSparkCatalog(spark);
+ setupDefaultSparkCatalogs(spark);
String path = options.get("path");
Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
@@ -125,11 +133,14 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
CatalogManager catalogManager = spark.sessionState().catalogManager();
- if (path.contains("/")) {
+ if (TABLE_CACHE.contains(path)) {
+ return new Spark3Util.CatalogAndIdentifier(
+ catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
+ Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
+ } else if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
- String newPath = (selector == null) ? path : path + "#" + selector;
return new
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
- new PathIdentifier(newPath));
+ new PathIdentifier(pathWithSelector(path, selector)));
}
final Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
Spark3Util.catalogAndIdentifier(
@@ -145,6 +156,10 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
}
}
+ private String pathWithSelector(String path, String selector) {
+ return (selector == null) ? path : path + "#" + selector;
+ }
+
private Identifier identifierWithSelector(Identifier ident, String selector)
{
if (selector == null) {
return ident;
@@ -185,17 +200,19 @@ public class IcebergSource implements DataSourceRegister,
SupportsCatalogOptions
return null;
}
- private static void setupDefaultSparkCatalog(SparkSession spark) {
- if (spark.conf().contains(DEFAULT_CATALOG)) {
- return;
+ private static void setupDefaultSparkCatalogs(SparkSession spark) {
+ if (!spark.conf().contains(DEFAULT_CATALOG)) {
+ ImmutableMap<String, String> config = ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "cache-enabled", "false" // the source should not use a cache
+ );
+ spark.conf().set(DEFAULT_CATALOG, SparkCatalog.class.getName());
+ config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." +
key, value));
+ }
+
+ if (!spark.conf().contains(DEFAULT_CACHE_CATALOG)) {
+ spark.conf().set(DEFAULT_CACHE_CATALOG,
SparkCachedTableCatalog.class.getName());
}
- ImmutableMap<String, String> config = ImmutableMap.of(
- "type", "hive",
- "default-namespace", "default",
- "cache-enabled", "false" // the source should not use a cache
- );
- String catalogName = "org.apache.iceberg.spark.SparkCatalog";
- spark.conf().set(DEFAULT_CATALOG, catalogName);
- config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." +
key, value));
}
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
new file mode 100644
index 0000000000..817af03029
--- /dev/null
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkCachedTableCatalog extends SparkTestBaseWithCatalog {
+
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+ @BeforeClass
+ public static void setupCachedTableCatalog() {
+ spark.conf().set("spark.sql.catalog.testcache",
SparkCachedTableCatalog.class.getName());
+ }
+
+ @AfterClass
+ public static void unsetCachedTableCatalog() {
+ spark.conf().unset("spark.sql.catalog.testcache");
+ }
+
+ public TestSparkCachedTableCatalog() {
+ super(SparkCatalogConfig.HIVE);
+ }
+
+ @Test
+ public void testTimeTravel() {
+ sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ table.refresh();
+ Snapshot firstSnapshot = table.currentSnapshot();
+ waitUntilAfter(firstSnapshot.timestampMillis());
+
+ sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
+
+ table.refresh();
+ Snapshot secondSnapshot = table.currentSnapshot();
+ waitUntilAfter(secondSnapshot.timestampMillis());
+
+ sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
+
+ table.refresh();
+
+ try {
+ TABLE_CACHE.add("key", table);
+
+ assertEquals("Should have expected rows in 3rd snapshot",
+ ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
+ sql("SELECT * FROM testcache.key ORDER BY id"));
+
+ assertEquals("Should have expected rows in 2nd snapshot",
+ ImmutableList.of(row(1, "hr"), row(2, "hr")),
+ sql("SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id",
secondSnapshot.timestampMillis()));
+
+ assertEquals("Should have expected rows in 1st snapshot",
+ ImmutableList.of(row(1, "hr")),
+ sql("SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id",
firstSnapshot.snapshotId()));
+
+ } finally {
+ TABLE_CACHE.remove("key");
+ }
+ }
+}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index ea2a7755f2..e0b1e53cfe 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
private RewriteDataFiles basicRewrite(Table table) {
// Always compact regardless of input files
table.refresh();
- return actions().rewriteDataFiles(table,
tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+ return
actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
}
@Test
@@ -259,7 +259,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table, tableLocation)
+ Result result = actions().rewriteDataFiles(table)
// do not include any file based on bin pack file size configs
.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
@@ -293,7 +293,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table, tableLocation)
+ Result result = actions().rewriteDataFiles(table)
.option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
.execute();
Assert.assertEquals("Action should rewrite 1 data files", 1,
result.rewrittenDataFilesCount());
@@ -1123,17 +1123,17 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Table table = createTable(1);
AssertHelpers.assertThrows("Should be unable to set Strategy more than
once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table,
tableLocation).binPack().sort());
+ "Cannot set strategy", () ->
actions().rewriteDataFiles(table).binPack().sort());
AssertHelpers.assertThrows("Should be unable to set Strategy more than
once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table,
tableLocation).sort().binPack());
+ "Cannot set strategy", () ->
actions().rewriteDataFiles(table).sort().binPack());
AssertHelpers.assertThrows(
"Should be unable to set Strategy more than once",
IllegalArgumentException.class,
"Cannot set strategy",
() ->
- actions().rewriteDataFiles(table,
tableLocation).sort(SortOrder.unsorted()).binPack());
+
actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
}
@Test