This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9ce0e6e118 Spark 4.1: Separate compaction and main operations (#15301)
9ce0e6e118 is described below
commit 9ce0e6e11893d412b4d7e836df7b71247f10ae2f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Feb 16 17:21:46 2026 -0800
Spark 4.1: Separate compaction and main operations (#15301)
---
.../TestMetaColumnProjectionWithStageScan.java | 10 +-
.../extensions/TestRewriteDataFilesProcedure.java | 15 +-
.../iceberg/spark/SparkCachedTableCatalog.java | 257 ---------------------
.../org/apache/iceberg/spark/SparkReadConf.java | 4 -
.../org/apache/iceberg/spark/SparkReadOptions.java | 3 -
.../iceberg/spark/SparkRewriteTableCatalog.java | 120 ++++++++++
.../org/apache/iceberg/spark/SparkTableCache.java | 5 +
.../org/apache/iceberg/spark/SparkWriteConf.java | 7 -
.../apache/iceberg/spark/SparkWriteOptions.java | 3 -
.../actions/SparkBinPackFileRewriteRunner.java | 2 -
.../actions/SparkRewritePositionDeleteRunner.java | 2 -
.../actions/SparkShufflingFileRewriteRunner.java | 9 +-
.../iceberg/spark/source/BaseSparkTable.java | 165 +++++++++++++
.../apache/iceberg/spark/source/IcebergSource.java | 32 +--
.../spark/source/SparkPositionDeletesRewrite.java | 6 +-
.../source/SparkPositionDeletesRewriteBuilder.java | 21 +-
.../iceberg/spark/source/SparkRewriteTable.java | 74 ++++++
.../spark/source/SparkRewriteWriteBuilder.java | 91 ++++++++
.../iceberg/spark/source/SparkStagedScan.java | 21 +-
.../spark/source/SparkStagedScanBuilder.java | 7 +-
.../apache/iceberg/spark/source/SparkTable.java | 12 +-
.../iceberg/spark/source/SparkWriteBuilder.java | 17 +-
.../iceberg/spark/TestFileRewriteCoordinator.java | 71 +++---
.../iceberg/spark/TestSparkCachedTableCatalog.java | 105 ---------
.../spark/source/TestPositionDeletesTable.java | 100 +++-----
.../iceberg/spark/source/TestSparkStagedScan.java | 26 +--
26 files changed, 595 insertions(+), 590 deletions(-)
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
index b783a006ef..56191b38ef 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -88,32 +89,31 @@ public class TestMetaColumnProjectionWithStageScan extends
ExtensionsTestBase {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
table.refresh();
- String tableLocation = table.location();
try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles())
{
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, table);
stageTask(table, fileSetID, tasks);
Dataset<Row> scanDF2 =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
- .load(tableLocation);
+ .load(fileSetID);
assertThat(scanDF2.columns()).hasSize(2);
}
try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles())
{
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, table);
stageTask(table, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
- .load(tableLocation)
+ .load(fileSetID)
.select("*", "_pos");
List<Row> rows = scanDF.collectAsList();
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index c652b011ba..b37422beac 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -830,7 +830,10 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
- assertThat(SparkTableCache.get().size()).as("Table cache must be
empty").isZero();
+ Table table = validationCatalog.loadTable(identifier);
+ assertThat(SparkTableCache.get().tables())
+ .as("Table cache must not contain the test table")
+ .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}
@TestTemplate
@@ -870,7 +873,10 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
- assertThat(SparkTableCache.get().size()).as("Table cache must be
empty").isZero();
+ Table table = validationCatalog.loadTable(identifier);
+ assertThat(SparkTableCache.get().tables())
+ .as("Table cache must not contain the test table")
+ .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}
@TestTemplate
@@ -910,7 +916,10 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
List<Object[]> actualRecords =
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
- assertThat(SparkTableCache.get().size()).as("Table cache must be
empty").isZero();
+ Table table = validationCatalog.loadTable(identifier);
+ assertThat(SparkTableCache.get().tables())
+ .as("Table cache must not contain the test table")
+ .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}
@TestTemplate
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
deleted file mode 100644
index 28427f597b..0000000000
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.spark.source.SparkTable;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.SnapshotUtil;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.catalog.TableChange;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/** An internal table catalog that is capable of loading tables from a cache.
*/
-public class SparkCachedTableCatalog implements TableCatalog,
SupportsFunctions {
-
- private static final String CLASS_NAME =
SparkCachedTableCatalog.class.getName();
- private static final Splitter COMMA = Splitter.on(",");
- private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
- private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
- private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
- private static final Pattern TAG = Pattern.compile("tag_(.*)");
- private static final String REWRITE = "rewrite";
-
- private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
-
- private String name = null;
-
- @Override
- public Identifier[] listTables(String[] namespace) {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
listing tables");
- }
-
- @Override
- public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
- return load(ident);
- }
-
- @Override
- public SparkTable loadTable(Identifier ident, String version) throws
NoSuchTableException {
- SparkTable table = load(ident);
- Preconditions.checkArgument(
- table.snapshotId() == null, "Cannot time travel based on both table
identifier and AS OF");
- return table.copyWithSnapshotId(Long.parseLong(version));
- }
-
- @Override
- public SparkTable loadTable(Identifier ident, long timestampMicros) throws
NoSuchTableException {
- SparkTable table = load(ident);
- Preconditions.checkArgument(
- table.snapshotId() == null, "Cannot time travel based on both table
identifier and AS OF");
- // Spark passes microseconds but Iceberg uses milliseconds for snapshots
- long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestampMicros);
- long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table.table(),
timestampMillis);
- return table.copyWithSnapshotId(snapshotId);
- }
-
- @Override
- public void invalidateTable(Identifier ident) {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
table invalidation");
- }
-
- @Override
- public SparkTable createTable(
- Identifier ident, StructType schema, Transform[] partitions, Map<String,
String> properties)
- throws TableAlreadyExistsException {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
creating tables");
- }
-
- @Override
- public SparkTable alterTable(Identifier ident, TableChange... changes) {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
altering tables");
- }
-
- @Override
- public boolean dropTable(Identifier ident) {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
dropping tables");
- }
-
- @Override
- public boolean purgeTable(Identifier ident) throws
UnsupportedOperationException {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
purging tables");
- }
-
- @Override
- public void renameTable(Identifier oldIdent, Identifier newIdent) {
- throw new UnsupportedOperationException(CLASS_NAME + " does not support
renaming tables");
- }
-
- @Override
- public void initialize(String catalogName, CaseInsensitiveStringMap options)
{
- this.name = catalogName;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- private SparkTable load(Identifier ident) throws NoSuchTableException {
- Preconditions.checkArgument(
- ident.namespace().length == 0, CLASS_NAME + " does not support
namespaces");
-
- Pair<String, List<String>> parsedIdent = parseIdent(ident);
- String key = parsedIdent.first();
- TableLoadOptions options = parseLoadOptions(parsedIdent.second());
-
- Table table = TABLE_CACHE.get(key);
-
- if (table == null) {
- throw new NoSuchTableException(ident);
- }
-
- if (options.isTableRewrite()) {
- return new SparkTable(table, null, false, true);
- }
-
- if (options.snapshotId() != null) {
- return new SparkTable(table, options.snapshotId(), false);
- } else if (options.asOfTimestamp() != null) {
- return new SparkTable(
- table, SnapshotUtil.snapshotIdAsOfTime(table,
options.asOfTimestamp()), false);
- } else if (options.branch() != null) {
- Snapshot branchSnapshot = table.snapshot(options.branch());
- Preconditions.checkArgument(
- branchSnapshot != null,
- "Cannot find snapshot associated with branch name: %s",
- options.branch());
- return new SparkTable(table, branchSnapshot.snapshotId(), false);
- } else if (options.tag() != null) {
- Snapshot tagSnapshot = table.snapshot(options.tag());
- Preconditions.checkArgument(
- tagSnapshot != null, "Cannot find snapshot associated with tag name:
%s", options.tag());
- return new SparkTable(table, tagSnapshot.snapshotId(), false);
- } else {
- return new SparkTable(table, false);
- }
- }
-
- private static class TableLoadOptions {
- private Long asOfTimestamp;
- private Long snapshotId;
- private String branch;
- private String tag;
- private Boolean isTableRewrite;
-
- Long asOfTimestamp() {
- return asOfTimestamp;
- }
-
- Long snapshotId() {
- return snapshotId;
- }
-
- String branch() {
- return branch;
- }
-
- String tag() {
- return tag;
- }
-
- boolean isTableRewrite() {
- return Boolean.TRUE.equals(isTableRewrite);
- }
- }
-
- /** Extracts table load options from metadata. */
- private TableLoadOptions parseLoadOptions(List<String> metadata) {
- TableLoadOptions opts = new TableLoadOptions();
- for (String meta : metadata) {
- Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
- if (timeBasedMatcher.matches()) {
- opts.asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
- continue;
- }
-
- Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
- if (snapshotBasedMatcher.matches()) {
- opts.snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
- continue;
- }
-
- Matcher branchBasedMatcher = BRANCH.matcher(meta);
- if (branchBasedMatcher.matches()) {
- opts.branch = branchBasedMatcher.group(1);
- continue;
- }
-
- Matcher tagBasedMatcher = TAG.matcher(meta);
- if (tagBasedMatcher.matches()) {
- opts.tag = tagBasedMatcher.group(1);
- }
-
- if (meta.equalsIgnoreCase(REWRITE)) {
- opts.isTableRewrite = true;
- }
- }
-
- long numberOptions =
- Stream.of(opts.snapshotId, opts.asOfTimestamp, opts.branch, opts.tag,
opts.isTableRewrite)
- .filter(Objects::nonNull)
- .count();
- Preconditions.checkArgument(
- numberOptions <= 1,
- "Can specify only one of snapshot-id (%s), as-of-timestamp (%s),
branch (%s), tag (%s), is-table-rewrite (%s)",
- opts.snapshotId,
- opts.asOfTimestamp,
- opts.branch,
- opts.tag,
- opts.isTableRewrite);
-
- return opts;
- }
-
- private Pair<String, List<String>> parseIdent(Identifier ident) {
- int hashIndex = ident.name().lastIndexOf('#');
- if (hashIndex != -1 && !ident.name().endsWith("#")) {
- String key = ident.name().substring(0, hashIndex);
- List<String> metadata =
COMMA.splitToList(ident.name().substring(hashIndex + 1));
- return Pair.of(key, metadata);
- } else {
- return Pair.of(ident.name(), ImmutableList.of());
- }
- }
-}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 420c3517ff..238919ace7 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -132,10 +132,6 @@ public class SparkReadConf {
return
confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
}
- public String scanTaskSetId() {
- return
confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional();
- }
-
public boolean streamingSkipDeleteSnapshots() {
return confParser
.booleanConf()
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 17f2bfee69..8071b1db5b 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -62,9 +62,6 @@ public class SparkReadOptions {
// Overrides the table's read.parquet.vectorization.batch-size
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
- // Set ID that is used to fetch scan tasks
- public static final String SCAN_TASK_SET_ID = "scan-task-set-id";
-
// skip snapshots of type delete while reading stream out of iceberg table
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS =
"streaming-skip-delete-snapshots";
public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
new file mode 100644
index 0000000000..a1016beb18
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.source.SparkRewriteTable;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkRewriteTableCatalog implements TableCatalog,
SupportsFunctions {
+
+ private static final String CLASS_NAME =
SparkRewriteTableCatalog.class.getName();
+ private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+ private String name = null;
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
listing tables");
+ }
+
+ @Override
+ public SparkRewriteTable loadTable(Identifier ident) throws
NoSuchTableException {
+ validateNoNamespace(ident);
+
+ String groupId = ident.name();
+ Table table = TABLE_CACHE.get(groupId);
+
+ if (table == null) {
+ throw new NoSuchTableException(ident);
+ }
+
+ return new SparkRewriteTable(table, groupId);
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident, String version) throws
NoSuchTableException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
time travel");
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident, long timestampMicros) throws
NoSuchTableException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
time travel");
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
table invalidation");
+ }
+
+ @Override
+ public SparkTable createTable(
+ Identifier ident, StructType schema, Transform[] partitions, Map<String,
String> properties)
+ throws TableAlreadyExistsException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
creating tables");
+ }
+
+ @Override
+ public SparkTable alterTable(Identifier ident, TableChange... changes) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
altering tables");
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
dropping tables");
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
purging tables");
+ }
+
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent) {
+ throw new UnsupportedOperationException(CLASS_NAME + " does not support
renaming tables");
+ }
+
+ @Override
+ public void initialize(String catalogName, CaseInsensitiveStringMap options)
{
+ this.name = catalogName;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ private void validateNoNamespace(Identifier ident) {
+ Preconditions.checkArgument(
+ ident.namespace().length == 0,
+ "%s does not support namespaces, but got: %s",
+ CLASS_NAME,
+ String.join(".", ident.namespace()));
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
index 6218423db4..83c6303d0f 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark;
+import java.util.Collection;
import java.util.Map;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -51,4 +52,8 @@ public class SparkTableCache {
public Table remove(String key) {
return cache.remove(key);
}
+
+ public Collection<Table> tables() {
+ return cache.values();
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 96131e0e56..6648d7ea38 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -267,13 +267,6 @@ public class SparkWriteConf {
return extraSnapshotMetadata;
}
- public String rewrittenFileSetId() {
- return confParser
- .stringConf()
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)
- .parseOptional();
- }
-
public SparkWriteRequirements writeRequirements() {
if (ignoreTableDistributionAndOrdering()) {
LOG.info("Skipping distribution/ordering: disabled per job
configuration");
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 33db70bae5..40816eef2f 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -50,9 +50,6 @@ public class SparkWriteOptions {
// Checks if input schema and table schema are same(default: true)
public static final String CHECK_ORDERING = "check-ordering";
- // File scan task set ID that indicates which files must be replaced
- public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID =
"rewritten-file-scan-task-set-id";
-
public static final String OUTPUT_SPEC_ID = "output-spec-id";
public static final String OVERWRITE_MODE = "overwrite-mode";
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
index 6d2ef585b1..084e21b1bd 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
@@ -45,7 +45,6 @@ class SparkBinPackFileRewriteRunner extends
SparkDataFileRewriteRunner {
spark()
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
.option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize())
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.load(groupId);
@@ -54,7 +53,6 @@ class SparkBinPackFileRewriteRunner extends
SparkDataFileRewriteRunner {
scanDF
.write()
.format("iceberg")
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES,
group.maxOutputFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE,
distributionMode(group).modeName())
.option(SparkWriteOptions.OUTPUT_SPEC_ID, group.outputSpecId())
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
index 4bbd228056..4bcf208110 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
@@ -105,7 +105,6 @@ class SparkRewritePositionDeleteRunner
spark()
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
.option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize())
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.load(groupId);
@@ -120,7 +119,6 @@ class SparkRewritePositionDeleteRunner
.sortWithinPartitions("file_path", "pos")
.write()
.format("iceberg")
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_DELETE_FILE_SIZE_BYTES,
group.maxOutputFileSize())
.mode("append")
.save(groupId);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
index 569eb252cb..bc1665dc32 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
@@ -31,7 +31,6 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFunctionCatalog;
-import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
@@ -104,12 +103,7 @@ abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunne
@Override
public void doRewrite(String groupId, RewriteFileGroup fileGroup) {
- Dataset<Row> scanDF =
- spark()
- .read()
- .format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
- .load(groupId);
+ Dataset<Row> scanDF = spark().read().format("iceberg").load(groupId);
Dataset<Row> sortedDF =
sortedDF(
@@ -122,7 +116,6 @@ abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunne
sortedDF
.write()
.format("iceberg")
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES,
fileGroup.maxOutputFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
new file mode 100644
index 0000000000..a5d9293a9f
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_ID;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+
+abstract class BaseSparkTable
+ implements org.apache.spark.sql.connector.catalog.Table,
SupportsMetadataColumns {
+
+ private static final String PROVIDER = "provider";
+ private static final String FORMAT = "format";
+ private static final String LOCATION = "location";
+ private static final String SORT_ORDER = "sort-order";
+ private static final String IDENTIFIER_FIELDS = "identifier-fields";
+ private static final Set<String> RESERVED_PROPERTIES =
+ ImmutableSet.of(
+ PROVIDER,
+ FORMAT,
+ CURRENT_SNAPSHOT_ID,
+ LOCATION,
+ FORMAT_VERSION,
+ SORT_ORDER,
+ IDENTIFIER_FIELDS);
+
+ private final Table table;
+ private final Schema schema;
+
+ private SparkSession lazySpark = null;
+ private StructType lazySparkSchema = null;
+
+ protected BaseSparkTable(Table table, Schema schema) {
+ this.table = table;
+ this.schema = schema;
+ }
+
+ protected SparkSession spark() {
+ if (lazySpark == null) {
+ this.lazySpark = SparkSession.active();
+ }
+ return lazySpark;
+ }
+
+ public Table table() {
+ return table;
+ }
+
+ @Override
+ public String name() {
+ return table.toString();
+ }
+
+ @Override
+ public StructType schema() {
+ if (lazySparkSchema == null) {
+ this.lazySparkSchema = SparkSchemaUtil.convert(schema);
+ }
+ return lazySparkSchema;
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return Spark3Util.toTransforms(table.spec());
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();
+
+ propsBuilder.put(FORMAT, "iceberg/" + fileFormat());
+ propsBuilder.put(PROVIDER, "iceberg");
+ propsBuilder.put(LOCATION, table.location());
+ propsBuilder.put(CURRENT_SNAPSHOT_ID, currentSnapshotId());
+
+ if (table instanceof BaseTable) {
+ TableOperations ops = ((BaseTable) table).operations();
+ propsBuilder.put(FORMAT_VERSION,
String.valueOf(ops.current().formatVersion()));
+ }
+
+ if (table.sortOrder().isSorted()) {
+ propsBuilder.put(SORT_ORDER, Spark3Util.describe(table.sortOrder()));
+ }
+
+ Set<String> identifierFields = table.schema().identifierFieldNames();
+ if (!identifierFields.isEmpty()) {
+ propsBuilder.put(IDENTIFIER_FIELDS, "[" + String.join(",",
identifierFields) + "]");
+ }
+
+ table.properties().entrySet().stream()
+ .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
+ .forEach(propsBuilder::put);
+
+ return propsBuilder.build();
+ }
+
+ @Override
+ public MetadataColumn[] metadataColumns() {
+ List<SparkMetadataColumn> cols = Lists.newArrayList();
+
+ cols.add(SparkMetadataColumns.SPEC_ID);
+ cols.add(SparkMetadataColumns.partition(table));
+ cols.add(SparkMetadataColumns.FILE_PATH);
+ cols.add(SparkMetadataColumns.ROW_POSITION);
+ cols.add(SparkMetadataColumns.IS_DELETED);
+
+ if (TableUtil.supportsRowLineage(table)) {
+ cols.add(SparkMetadataColumns.ROW_ID);
+ cols.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+ }
+
+ return cols.toArray(SparkMetadataColumn[]::new);
+ }
+
+ private String fileFormat() {
+ return table.properties().getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
+ }
+
+ private String currentSnapshotId() {
+ Snapshot currentSnapshot = table.currentSnapshot();
+ return currentSnapshot != null ?
String.valueOf(currentSnapshot.snapshotId()) : "none";
+ }
+
+ @Override
+ public String toString() {
+ return table.toString();
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index a9df994615..a0462e8f89 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -27,12 +27,11 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.SparkCachedTableCatalog;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkRewriteTableCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.SparkTableCache;
-import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -66,15 +65,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class IcebergSource
implements DataSourceRegister, SupportsCatalogOptions,
SessionConfigSupport {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
- private static final String DEFAULT_CACHE_CATALOG_NAME =
"default_cache_iceberg";
- private static final String DEFAULT_CATALOG = "spark.sql.catalog." +
DEFAULT_CATALOG_NAME;
- private static final String DEFAULT_CACHE_CATALOG =
- "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
+ private static final String REWRITE_CATALOG_NAME = "default_rewrite_catalog";
+ private static final String CATALOG_PREFIX = "spark.sql.catalog.";
+ private static final String DEFAULT_CATALOG = CATALOG_PREFIX +
DEFAULT_CATALOG_NAME;
+ private static final String REWRITE_CATALOG = CATALOG_PREFIX +
REWRITE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
private static final String BRANCH_PREFIX = "branch_";
private static final String TAG_PREFIX = "tag_";
- private static final String REWRITE_SELECTOR = "rewrite";
private static final String[] EMPTY_NAMESPACE = new String[0];
private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
@@ -165,21 +163,15 @@ public class IcebergSource
selector = TAG_PREFIX + tag;
}
- String groupId =
- options.getOrDefault(
- SparkReadOptions.SCAN_TASK_SET_ID,
- options.get(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID));
- if (groupId != null) {
- selector = REWRITE_SELECTOR;
- }
-
CatalogManager catalogManager = spark.sessionState().catalogManager();
+ // return rewrite catalog with path as group ID if table is staged for
rewrite
if (TABLE_CACHE.contains(path)) {
return new Spark3Util.CatalogAndIdentifier(
- catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
- Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
- } else if (path.contains("/")) {
+ catalogManager.catalog(REWRITE_CATALOG_NAME),
Identifier.of(EMPTY_NAMESPACE, path));
+ }
+
+ if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
return new Spark3Util.CatalogAndIdentifier(
catalogManager.catalog(DEFAULT_CATALOG_NAME),
@@ -258,8 +250,8 @@ public class IcebergSource
config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." +
key, value));
}
- if (spark.conf().getOption(DEFAULT_CACHE_CATALOG).isEmpty()) {
- spark.conf().set(DEFAULT_CACHE_CATALOG,
SparkCachedTableCatalog.class.getName());
+ if (spark.conf().getOption(REWRITE_CATALOG).isEmpty()) {
+ spark.conf().set(REWRITE_CATALOG,
SparkRewriteTableCatalog.class.getName());
}
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 0ec7084bfd..7fc535ffa5 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -70,13 +70,13 @@ public class SparkPositionDeletesRewrite implements Write {
private final JavaSparkContext sparkContext;
private final Table table;
+ private final String fileSetId;
private final String queryId;
private final FileFormat format;
private final long targetFileSize;
private final DeleteGranularity deleteGranularity;
private final Schema writeSchema;
private final StructType dsSchema;
- private final String fileSetId;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;
@@ -86,6 +86,7 @@ public class SparkPositionDeletesRewrite implements Write {
*
* @param spark Spark session
* @param table instance of {@link PositionDeletesTable}
+ * @param fileSetId file set ID
* @param writeConf Spark write config
* @param writeInfo Spark write info
* @param writeSchema Iceberg output schema
@@ -96,6 +97,7 @@ public class SparkPositionDeletesRewrite implements Write {
SparkPositionDeletesRewrite(
SparkSession spark,
Table table,
+ String fileSetId,
SparkWriteConf writeConf,
LogicalWriteInfo writeInfo,
Schema writeSchema,
@@ -104,13 +106,13 @@ public class SparkPositionDeletesRewrite implements Write
{
StructLike partition) {
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
+ this.fileSetId = fileSetId;
this.queryId = writeInfo.queryId();
this.format = writeConf.deleteFileFormat();
this.targetFileSize = writeConf.targetDeleteFileSize();
this.deleteGranularity = writeConf.deleteGranularity();
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
- this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties();
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
index 9fccc05ea2..5e5d268ab9 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
@@ -50,28 +50,25 @@ public class SparkPositionDeletesRewriteBuilder implements
WriteBuilder {
private final SparkSession spark;
private final Table table;
+ private final String fileSetId;
private final SparkWriteConf writeConf;
- private final LogicalWriteInfo writeInfo;
+ private final LogicalWriteInfo info;
private final StructType dsSchema;
private final Schema writeSchema;
SparkPositionDeletesRewriteBuilder(
- SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+ SparkSession spark, Table table, String fileSetId, LogicalWriteInfo
info) {
this.spark = spark;
this.table = table;
- this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
- this.writeInfo = info;
+ this.fileSetId = fileSetId;
+ this.writeConf = new SparkWriteConf(spark, table, info.options());
+ this.info = info;
this.dsSchema = info.schema();
this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema,
writeConf.caseSensitive());
}
@Override
public Write build() {
- String fileSetId = writeConf.rewrittenFileSetId();
-
- Preconditions.checkArgument(
- fileSetId != null, "Can only write to %s via actions", table.name());
-
// all files of rewrite group have same partition and spec id
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table,
fileSetId);
@@ -82,10 +79,10 @@ public class SparkPositionDeletesRewriteBuilder implements
WriteBuilder {
StructLike partition = partition(fileSetId, tasks);
return new SparkPositionDeletesRewrite(
- spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId,
partition);
+ spark, table, fileSetId, writeConf, info, writeSchema, dsSchema,
specId, partition);
}
- private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+ private static int specId(String fileSetId, List<PositionDeletesScanTask>
tasks) {
Set<Integer> specIds = tasks.stream().map(t ->
t.spec().specId()).collect(Collectors.toSet());
Preconditions.checkArgument(
specIds.size() == 1,
@@ -95,7 +92,7 @@ public class SparkPositionDeletesRewriteBuilder implements
WriteBuilder {
return tasks.get(0).spec().specId();
}
- private StructLike partition(String fileSetId, List<PositionDeletesScanTask>
tasks) {
+ private static StructLike partition(String fileSetId,
List<PositionDeletesScanTask> tasks) {
StructLikeSet partitions =
StructLikeSet.create(tasks.get(0).spec().partitionType());
tasks.stream().map(ContentScanTask::partition).forEach(partitions::add);
Preconditions.checkArgument(
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
new file mode 100644
index 0000000000..73d5b34f1c
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkRewriteTable extends BaseSparkTable implements SupportsRead,
SupportsWrite {
+
+ private static final Set<TableCapability> CAPABILITIES =
+ ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE);
+
+ private final String groupId;
+
+ public SparkRewriteTable(Table table, String groupId) {
+ super(table, rewriteSchema(table));
+ this.groupId = groupId;
+ }
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ return CAPABILITIES;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return new SparkStagedScanBuilder(spark(), table(), groupId, options);
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+ if (table() instanceof PositionDeletesTable) {
+ return new SparkPositionDeletesRewriteBuilder(spark(), table(), groupId,
info);
+ } else {
+ return new SparkRewriteWriteBuilder(spark(), table(),
rewriteSchema(table()), groupId, info);
+ }
+ }
+
+ private static Schema rewriteSchema(Table table) {
+ if (TableUtil.supportsRowLineage(table)) {
+ return MetadataColumns.schemaWithRowLineage(table.schema());
+ } else {
+ return table.schema();
+ }
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
new file mode 100644
index 0000000000..714ac8e485
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+
+class SparkRewriteWriteBuilder implements WriteBuilder {
+
+ private final SparkSession spark;
+ private final Table table;
+ private final Schema schema;
+ private final String groupId;
+ private final SparkWriteConf writeConf;
+ private final LogicalWriteInfo info;
+ private final boolean caseSensitive;
+ private final boolean checkNullability;
+ private final boolean checkOrdering;
+
+ SparkRewriteWriteBuilder(
+ SparkSession spark, Table table, Schema schema, String groupId,
LogicalWriteInfo info) {
+ this.spark = spark;
+ this.table = table;
+ this.schema = schema;
+ this.groupId = groupId;
+ this.writeConf = new SparkWriteConf(spark, table, info.options());
+ this.info = info;
+ this.caseSensitive = writeConf.caseSensitive();
+ this.checkNullability = writeConf.checkNullability();
+ this.checkOrdering = writeConf.checkOrdering();
+ }
+
+ @Override
+ public Write build() {
+ Schema writeSchema = validateWriteSchema();
+ SparkUtil.validatePartitionTransforms(table.spec());
+ String appId = spark.sparkContext().applicationId();
+ return new SparkWrite(
+ spark,
+ table,
+ writeConf,
+ info,
+ appId,
+ writeSchema,
+ info.schema(),
+ writeConf.writeRequirements()) {
+
+ @Override
+ public BatchWrite toBatch() {
+ return asRewrite(groupId);
+ }
+
+ @Override
+ public StreamingWrite toStreaming() {
+ throw new UnsupportedOperationException("Streaming writes are not
supported for rewrites");
+ }
+ };
+ }
+
+ private Schema validateWriteSchema() {
+ Schema writeSchema = SparkSchemaUtil.convert(schema, info.schema(),
caseSensitive);
+ TypeUtil.validateWriteSchema(schema, writeSchema, checkNullability,
checkOrdering);
+ return writeSchema;
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index 394c922736..435c2cbd15 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.iceberg.ScanTask;
@@ -28,8 +29,10 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Statistics;
class SparkStagedScan extends SparkScan {
@@ -40,14 +43,26 @@ class SparkStagedScan extends SparkScan {
private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of
tasks
- SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema,
SparkReadConf readConf) {
- super(spark, table, readConf, expectedSchema, ImmutableList.of(), null);
- this.taskSetId = readConf.scanTaskSetId();
+ SparkStagedScan(
+ SparkSession spark,
+ Table table,
+ Schema projection,
+ String taskSetId,
+ SparkReadConf readConf) {
+ super(spark, table, readConf, projection, ImmutableList.of(), null);
+ this.taskSetId = taskSetId;
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
this.openFileCost = readConf.splitOpenFileCost();
}
+ @Override
+ public Statistics estimateStatistics() {
+ long rowsCount =
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+ long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+ return new Stats(sizeInBytes, rowsCount, Collections.emptyMap());
+ }
+
@Override
protected List<ScanTaskGroup<ScanTask>> taskGroups() {
if (taskGroups == null) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
index c5c86c3ebf..7164c53a3d 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
@@ -41,21 +41,24 @@ class SparkStagedScanBuilder implements ScanBuilder,
SupportsPushDownRequiredCol
private final SparkSession spark;
private final Table table;
+ private final String taskSetId;
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();
private Schema schema;
- SparkStagedScanBuilder(SparkSession spark, Table table,
CaseInsensitiveStringMap options) {
+ SparkStagedScanBuilder(
+ SparkSession spark, Table table, String taskSetId,
CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
+ this.taskSetId = taskSetId;
this.readConf = new SparkReadConf(spark, table, options);
this.schema = table.schema();
}
@Override
public Scan build() {
- return new SparkStagedScan(spark, table, schemaWithMetadataColumns(),
readConf);
+ return new SparkStagedScan(spark, table, schemaWithMetadataColumns(),
taskSetId, readConf);
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 8261d03178..335d0f72fd 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PositionDeletesTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
@@ -289,10 +288,6 @@ public class SparkTable
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
- if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {
- return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);
- }
-
if (refreshEagerly) {
icebergTable.refresh();
}
@@ -307,12 +302,7 @@ public class SparkTable
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot:
%s", snapshotId);
-
- if (icebergTable instanceof PositionDeletesTable) {
- return new SparkPositionDeletesRewriteBuilder(sparkSession(),
icebergTable, branch, info);
- } else {
- return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
- }
+ return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 89af7740d9..182e56a861 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -54,7 +54,6 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
private final LogicalWriteInfo writeInfo;
private final StructType dsSchema;
private final String overwriteMode;
- private final String rewrittenFileSetId;
private boolean overwriteDynamic = false;
private boolean overwriteByFilter = false;
private Expression overwriteExpr = null;
@@ -70,15 +69,12 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
this.writeInfo = info;
this.dsSchema = info.schema();
this.overwriteMode = writeConf.overwriteMode();
- this.rewrittenFileSetId = writeConf.rewrittenFileSetId();
}
public WriteBuilder overwriteFiles(Scan scan, Command command,
IsolationLevel isolationLevel) {
Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual
files and by filter");
Preconditions.checkState(
!overwriteDynamic, "Cannot overwrite individual files and
dynamically");
- Preconditions.checkState(
- rewrittenFileSetId == null, "Cannot overwrite individual files and
rewrite");
this.overwriteFiles = true;
this.copyOnWriteScan = (SparkCopyOnWriteScan) scan;
@@ -92,8 +88,6 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
Preconditions.checkState(
!overwriteByFilter, "Cannot overwrite dynamically and by filter: %s",
overwriteExpr);
Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual
files and dynamically");
- Preconditions.checkState(
- rewrittenFileSetId == null, "Cannot overwrite dynamically and
rewrite");
this.overwriteDynamic = true;
return this;
@@ -103,7 +97,6 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
public WriteBuilder overwrite(Filter[] filters) {
Preconditions.checkState(
!overwriteFiles, "Cannot overwrite individual files and using
filters");
- Preconditions.checkState(rewrittenFileSetId == null, "Cannot overwrite and
rewrite");
this.overwriteExpr = SparkFilters.convert(filters);
if (overwriteExpr == Expressions.alwaysTrue() &&
"dynamic".equals(overwriteMode)) {
@@ -123,9 +116,7 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
// operation or if it's a compaction.
// In any other case, only null row IDs and sequence numbers would be
produced which
// means the row lineage columns can be excluded from the output files
- boolean writeRequiresRowLineage =
- TableUtil.supportsRowLineage(table)
- && (overwriteFiles || writeConf.rewrittenFileSetId() != null);
+ boolean writeRequiresRowLineage = TableUtil.supportsRowLineage(table) &&
overwriteFiles;
boolean writeAlreadyIncludesLineage =
dsSchema.exists(field ->
field.name().equals(MetadataColumns.ROW_ID.name()));
StructType sparkWriteSchema = dsSchema;
@@ -156,9 +147,7 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
@Override
public BatchWrite toBatch() {
- if (rewrittenFileSetId != null) {
- return asRewrite(rewrittenFileSetId);
- } else if (overwriteByFilter) {
+ if (overwriteByFilter) {
return asOverwriteByFilter(overwriteExpr);
} else if (overwriteDynamic) {
return asDynamicOverwrite();
@@ -177,8 +166,6 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
!overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
"Unsupported streaming operation: overwrite by filter: %s",
overwriteExpr);
- Preconditions.checkState(
- rewrittenFileSetId == null, "Unsupported streaming operation:
rewrite");
if (overwriteByFilter) {
return asStreamingOverwrite();
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 085eedf45d..664c201915 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -71,34 +71,31 @@ public class TestFileRewriteCoordinator extends
CatalogTestBase {
long avgFileSize = fileSizes.stream().mapToLong(i -> i).sum() /
fileSizes.size();
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
- String fileSetID = UUID.randomUUID().toString();
+ String groupId = UUID.randomUUID().toString();
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
- taskSetManager.stageTasks(table, fileSetID,
Lists.newArrayList(fileScanTasks));
+ taskSetManager.stageTasks(table, groupId,
Lists.newArrayList(fileScanTasks));
+ SparkTableCache.get().add(groupId, table);
// read and pack original 4 files into 2 splits
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, Long.toString(avgFileSize *
2))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .load(tableName);
+ .load(groupId);
// write the packed data into new files where each split becomes a new
file
- scanDF
- .writeTo(tableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(groupId);
// commit the rewrite
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
Set<DataFile> rewrittenFiles =
- taskSetManager.fetchTasks(table, fileSetID).stream()
+ taskSetManager.fetchTasks(table, groupId).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toCollection(DataFileSet::create));
- Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table,
fileSetID);
+ Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table,
groupId);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
@@ -127,20 +124,20 @@ public class TestFileRewriteCoordinator extends
CatalogTestBase {
assertThat(table.snapshots()).as("Should produce 4 snapshots").hasSize(4);
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
- String fileSetID = UUID.randomUUID().toString();
+ String groupId = UUID.randomUUID().toString();
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
- taskSetManager.stageTasks(table, fileSetID,
Lists.newArrayList(fileScanTasks));
+ taskSetManager.stageTasks(table, groupId,
Lists.newArrayList(fileScanTasks));
+ SparkTableCache.get().add(groupId, table);
// read original 4 files as 4 splits
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, "134217728")
.option(SparkReadOptions.FILE_OPEN_COST, "134217728")
- .load(tableName);
+ .load(groupId);
// make sure we disable AQE and set the number of shuffle partitions as
the target num files
ImmutableMap<String, String> sqlConf =
@@ -151,25 +148,17 @@ public class TestFileRewriteCoordinator extends
CatalogTestBase {
withSQLConf(
sqlConf,
() -> {
- try {
- // write new files with sorted records
- scanDF
- .sort("id")
- .writeTo(tableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID,
fileSetID)
- .append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException("Could not replace files", e);
- }
+ // write new files with sorted records
+
scanDF.sort("id").write().format("iceberg").mode("append").save(groupId);
});
// commit the rewrite
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
Set<DataFile> rewrittenFiles =
- taskSetManager.fetchTasks(table, fileSetID).stream()
+ taskSetManager.fetchTasks(table, groupId).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toCollection(DataFileSet::create));
- Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table,
fileSetID);
+ Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table,
groupId);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
@@ -196,14 +185,15 @@ public class TestFileRewriteCoordinator extends
CatalogTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- String firstFileSetID = UUID.randomUUID().toString();
+ String firstGroupId = UUID.randomUUID().toString();
long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
// stage first 2 files for compaction
- taskSetManager.stageTasks(table, firstFileSetID,
Lists.newArrayList(tasks));
+ taskSetManager.stageTasks(table, firstGroupId,
Lists.newArrayList(tasks));
+ SparkTableCache.get().add(firstGroupId, table);
}
// add two more files
@@ -212,43 +202,40 @@ public class TestFileRewriteCoordinator extends
CatalogTestBase {
table.refresh();
- String secondFileSetID = UUID.randomUUID().toString();
+ String secondGroupId = UUID.randomUUID().toString();
try (CloseableIterable<FileScanTask> tasks =
table.newScan().appendsAfter(firstFileSetSnapshotId).planFiles()) {
// stage 2 more files for compaction
- taskSetManager.stageTasks(table, secondFileSetID,
Lists.newArrayList(tasks));
+ taskSetManager.stageTasks(table, secondGroupId,
Lists.newArrayList(tasks));
+ SparkTableCache.get().add(secondGroupId, table);
}
- ImmutableSet<String> fileSetIDs = ImmutableSet.of(firstFileSetID,
secondFileSetID);
+ ImmutableSet<String> groupIds = ImmutableSet.of(firstGroupId,
secondGroupId);
- for (String fileSetID : fileSetIDs) {
+ for (String groupId : groupIds) {
// read and pack 2 files into 1 split
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
- .load(tableName);
+ .load(groupId);
// write the combined data as one file
- scanDF
- .writeTo(tableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(groupId);
}
// commit both rewrites at the same time
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
Set<DataFile> rewrittenFiles =
- fileSetIDs.stream()
- .flatMap(fileSetID -> taskSetManager.fetchTasks(table,
fileSetID).stream())
+ groupIds.stream()
+ .flatMap(groupId -> taskSetManager.fetchTasks(table,
groupId).stream())
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
Set<DataFile> addedFiles =
- fileSetIDs.stream()
- .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table,
fileSetID).stream())
+ groupIds.stream()
+ .flatMap(groupId -> rewriteCoordinator.fetchNewFiles(table,
groupId).stream())
.collect(Collectors.toCollection(DataFileSet::create));
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
deleted file mode 100644
index 228bf43b89..0000000000
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import org.apache.iceberg.ParameterizedTestExtension;
-import org.apache.iceberg.Parameters;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-@ExtendWith(ParameterizedTestExtension.class)
-public class TestSparkCachedTableCatalog extends TestBaseWithCatalog {
-
- private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
-
- @BeforeAll
- public static void setupCachedTableCatalog() {
- spark.conf().set("spark.sql.catalog.testcache",
SparkCachedTableCatalog.class.getName());
- }
-
- @AfterAll
- public static void unsetCachedTableCatalog() {
- spark.conf().unset("spark.sql.catalog.testcache");
- }
-
- @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
- protected static Object[][] parameters() {
- return new Object[][] {
- {
- SparkCatalogConfig.HIVE.catalogName(),
- SparkCatalogConfig.HIVE.implementation(),
- SparkCatalogConfig.HIVE.properties()
- },
- };
- }
-
- @TestTemplate
- public void testTimeTravel() {
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
-
- table.refresh();
- Snapshot firstSnapshot = table.currentSnapshot();
- waitUntilAfter(firstSnapshot.timestampMillis());
-
- sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
-
- table.refresh();
- Snapshot secondSnapshot = table.currentSnapshot();
- waitUntilAfter(secondSnapshot.timestampMillis());
-
- sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
-
- table.refresh();
-
- try {
- TABLE_CACHE.add("key", table);
-
- assertEquals(
- "Should have expected rows in 3rd snapshot",
- ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
- sql("SELECT * FROM testcache.key ORDER BY id"));
-
- assertEquals(
- "Should have expected rows in 2nd snapshot",
- ImmutableList.of(row(1, "hr"), row(2, "hr")),
- sql(
- "SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id",
- secondSnapshot.timestampMillis()));
-
- assertEquals(
- "Should have expected rows in 1st snapshot",
- ImmutableList.of(row(1, "hr")),
- sql(
- "SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id",
- firstSnapshot.snapshotId()));
-
- } finally {
- TABLE_CACHE.remove("key");
- }
- }
-}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 7892fd65b4..5641c7b2a0 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -71,7 +71,7 @@ import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkStructLike;
-import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
@@ -842,25 +842,21 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
for (String partValue : ImmutableList.of("a", "b")) {
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
partValue)) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID,
fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -911,23 +907,19 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
try (CloseableIterable<ScanTask> tasks =
posDeletesTable.newBatchScan().planFiles()) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -986,23 +978,15 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// rewrite delete files
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
for (String partValue : ImmutableList.of("a", "b")) {
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
partValue)) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
- Dataset<Row> scanDF =
- spark
- .read()
- .format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
- .load(posDeletesTableName);
+ Dataset<Row> scanDF = spark.read().format("iceberg").load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID,
fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -1067,26 +1051,22 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
// Read/write back unpartitioned data
try (CloseableIterable<ScanTask> tasks =
posDeletesTable.newBatchScan().filter(Expressions.isNull("partition.data")).planFiles())
{
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -1117,20 +1097,17 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
for (String partValue : ImmutableList.of("a", "b")) {
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
partValue)) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID,
fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
// commit the rewrite
commit(tab, posDeletesTable, fileSetID, 1);
@@ -1181,33 +1158,29 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Dataset<Row> scanDF;
String fileSetID = UUID.randomUUID().toString();
try (CloseableIterable<ScanTask> tasks =
posDeletesTable.newBatchScan().planFiles()) {
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
// Add partition field to render the original un-partitioned dataset
un-commitable
tab.updateSpec().addField("data").commit();
}
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(Arrays.asList(scanDF.columns()).contains("partition"));
dropTable(tableName);
@@ -1247,26 +1220,22 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
// rewrite files of old schema
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
"a")) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -1300,21 +1269,18 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// rewrite files of new schema
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
"c")) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -1371,26 +1337,22 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab,
MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = catalogName + ".default." + tableName +
".position_deletes";
// rewrite files
for (String partValue : ImmutableList.of("a", "b", "c", "d")) {
try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data",
partValue)) {
String fileSetID = UUID.randomUUID().toString();
+ SparkTableCache.get().add(fileSetID, posDeletesTable);
stageTask(tab, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
- .load(posDeletesTableName);
+ .load(fileSetID);
assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
- scanDF
- .writeTo(posDeletesTableName)
- .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID,
fileSetID)
- .append();
+ scanDF.write().format("iceberg").mode("append").save(fileSetID);
commit(tab, posDeletesTable, fileSetID, 1);
}
@@ -1453,8 +1415,8 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Dataset<Row> scanDF =
spark.read().format("iceberg").load(posDeletesTableName);
assertThatThrownBy(() -> scanDF.writeTo(posDeletesTableName).append())
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Can only write to " + posDeletesTableName + " via
actions");
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot append to a metadata table");
dropTable(tableName);
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index e444b7cb1f..de07b7471f 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -31,6 +31,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -60,16 +61,12 @@ public class TestSparkStagedScan extends CatalogTestBase {
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
- String setID = UUID.randomUUID().toString();
- taskSetManager.stageTasks(table, setID,
ImmutableList.copyOf(fileScanTasks));
+ String groupId = UUID.randomUUID().toString();
+ taskSetManager.stageTasks(table, groupId,
ImmutableList.copyOf(fileScanTasks));
+ SparkTableCache.get().add(groupId, table);
- // load the staged file set
- Dataset<Row> scanDF =
- spark
- .read()
- .format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
- .load(tableName);
+ // load the staged file set via the rewrite catalog
+ Dataset<Row> scanDF = spark.read().format("iceberg").load(groupId);
// write the records back essentially duplicating data
scanDF.writeTo(tableName).append();
@@ -96,18 +93,18 @@ public class TestSparkStagedScan extends CatalogTestBase {
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
- String setID = UUID.randomUUID().toString();
+ String groupId = UUID.randomUUID().toString();
List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
- taskSetManager.stageTasks(table, setID, tasks);
+ taskSetManager.stageTasks(table, groupId, tasks);
+ SparkTableCache.get().add(groupId, table);
// load the staged file set and make sure each file is in a separate
split
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
.option(SparkReadOptions.SPLIT_SIZE,
tasks.get(0).file().fileSizeInBytes())
- .load(tableName);
+ .load(groupId);
assertThat(scanDF.javaRDD().getNumPartitions())
.as("Num partitions should match")
.isEqualTo(2);
@@ -117,9 +114,8 @@ public class TestSparkStagedScan extends CatalogTestBase {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
.option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
- .load(tableName);
+ .load(groupId);
assertThat(scanDF.javaRDD().getNumPartitions())
.as("Num partitions should match")
.isEqualTo(1);