This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c4e35a55f2 Spark 3.4: Support distributed planning (#8123)
c4e35a55f2 is described below
commit c4e35a55f26b58ca71295adfe2b9c7b8bc34b55e
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Sep 12 14:06:15 2023 -0700
Spark 3.4: Support distributed planning (#8123)
---
.baseline/checkstyle/checkstyle.xml | 1 +
.../apache/iceberg/BaseDistributedDataScan.java | 392 +++++++++++++++++++++
.../src/main/java/org/apache/iceberg/BaseScan.java | 20 +-
.../src/main/java/org/apache/iceberg/DataScan.java | 70 ++++
.../java/org/apache/iceberg/DataTableScan.java | 9 +-
.../java/org/apache/iceberg/DeleteFileIndex.java | 4 +
.../java/org/apache/iceberg/ManifestGroup.java | 21 +-
.../ScanMetricsUtil.java => PlanningMode.java} | 39 +-
.../main/java/org/apache/iceberg/SnapshotScan.java | 10 +-
.../java/org/apache/iceberg/TableProperties.java | 4 +
.../apache/iceberg/metrics/ScanMetricsUtil.java | 14 +
...taTableScan.java => DataTableScanTestBase.java} | 65 ++--
...FileIndex.java => DeleteFileIndexTestBase.java} | 55 +--
...stFilterFiles.java => FilterFilesTestBase.java} | 25 +-
.../apache/iceberg/TestLocalDataTableScan.java} | 34 +-
.../apache/iceberg/TestLocalDeleteFileIndex.java} | 21 +-
.../org/apache/iceberg/TestLocalFilterFiles.java} | 28 +-
.../SparkRowLevelOperationsTestBase.java | 31 +-
.../spark/extensions/TestCopyOnWriteDelete.java | 7 +-
.../spark/extensions/TestCopyOnWriteMerge.java | 7 +-
.../spark/extensions/TestCopyOnWriteUpdate.java | 7 +-
.../iceberg/spark/extensions/TestDelete.java | 7 +-
.../apache/iceberg/spark/extensions/TestMerge.java | 7 +-
.../spark/extensions/TestMergeOnReadDelete.java | 7 +-
.../spark/extensions/TestMergeOnReadMerge.java | 7 +-
.../spark/extensions/TestMergeOnReadUpdate.java | 7 +-
.../iceberg/spark/extensions/TestUpdate.java | 7 +-
.../apache/iceberg/SparkDistributedDataScan.java | 247 +++++++++++++
.../org/apache/iceberg/spark/SparkReadConf.java | 47 +++
.../apache/iceberg/spark/SparkSQLProperties.java | 6 +
.../iceberg/spark/actions/BaseSparkAction.java | 1 +
.../iceberg/spark/actions/ManifestFileBean.java | 28 +-
.../iceberg/spark/source/SparkScanBuilder.java | 15 +-
.../iceberg/SparkDistributedDataScanTestBase.java | 100 ++++++
.../TestSparkDistributedDataScanDeletes.java | 90 +++++
.../TestSparkDistributedDataScanFilterFiles.java | 91 +++++
...tSparkDistributedDataScanJavaSerialization.java | 32 +-
...tSparkDistributedDataScanKryoSerialization.java | 32 +-
.../iceberg/spark/SparkTestBaseWithCatalog.java | 16 +
.../iceberg/spark/source/TestFilteredScan.java | 31 +-
.../spark/source/TestIdentityPartitionData.java | 26 +-
.../iceberg/spark/source/TestPartitionPruning.java | 26 +-
.../iceberg/spark/source/TestRuntimeFiltering.java | 26 ++
.../spark/source/TestSnapshotSelection.java | 45 ++-
.../spark/source/TestSparkReadProjection.java | 30 +-
.../iceberg/spark/sql/TestFilterPushDown.java | 29 ++
.../spark/sql/TestStoragePartitionedJoins.java | 20 ++
47 files changed, 1607 insertions(+), 237 deletions(-)
diff --git a/.baseline/checkstyle/checkstyle.xml
b/.baseline/checkstyle/checkstyle.xml
index b2f9ef1244..aec14e30b2 100644
--- a/.baseline/checkstyle/checkstyle.xml
+++ b/.baseline/checkstyle/checkstyle.xml
@@ -120,6 +120,7 @@
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
org.apache.iceberg.MetadataColumns.*,
+ org.apache.iceberg.PlanningMode.*,
org.apache.iceberg.SortDirection.*,
org.apache.iceberg.TableProperties.*,
org.apache.iceberg.types.Type.*,
diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
new file mode 100644
index 0000000000..152b62b443
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
+import static org.apache.iceberg.TableProperties.PLANNING_MODE_DEFAULT;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract class for batch data scans that can utilize cluster resources
for planning.
+ *
+ * <p>This class provides common logic to create data scans that are capable
of reading and
+ * filtering manifests remotely when the metadata size exceeds the threshold
for local processing.
+ * Also, it takes care of planning tasks locally if remote planning is not
considered beneficial.
+ *
+ * <p>Note that this class is evolving and is subject to change even in minor
releases.
+ */
+abstract class BaseDistributedDataScan
+ extends DataScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements
BatchScan {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseDistributedDataScan.class);
+ private static final long LOCAL_PLANNING_MAX_SLOT_SIZE = 128L * 1024 * 1024;
// 128 MB
+ private static final int MONITOR_POOL_SIZE = 2;
+
+ private final int localParallelism;
+ private final long localPlanningSizeThreshold;
+
+ protected BaseDistributedDataScan(Table table, Schema schema,
TableScanContext context) {
+ super(table, schema, context);
+ this.localParallelism = PLAN_SCANS_WITH_WORKER_POOL ?
ThreadPools.WORKER_THREAD_POOL_SIZE : 1;
+ this.localPlanningSizeThreshold = localParallelism *
LOCAL_PLANNING_MAX_SLOT_SIZE;
+ }
+
+ /**
+ * Returns the cluster parallelism.
+ *
+ * <p>This value indicates the maximum number of manifests that can be
processed concurrently by
+ * the cluster. Implementations should take into account both the currently
available processing
+ * slots and potential dynamic allocation, if applicable.
+ *
+ * <p>The remote parallelism is compared against the size of the thread pool
available locally to
+ * determine the feasibility of remote planning. This value is ignored if
the planning mode is set
+ * explicitly as local or distributed.
+ */
+ protected abstract int remoteParallelism();
+
+ /** Returns which planning mode to use for data. */
+ protected PlanningMode dataPlanningMode() {
+ Map<String, String> properties = table().properties();
+ String modeName = properties.getOrDefault(DATA_PLANNING_MODE,
PLANNING_MODE_DEFAULT);
+ return PlanningMode.fromName(modeName);
+ }
+
+ /**
+ * Controls whether defensive copies are created for remotely planned data
files.
+ *
+ * <p>By default, this class creates defensive copies for each data file
that is planned remotely,
+ * assuming the provided iterable can be lazy and may reuse objects. If
unnecessary and data file
+ * objects can be safely added into a collection, implementations can
override this behavior.
+ */
+ protected boolean shouldCopyRemotelyPlannedDataFiles() {
+ return true;
+ }
+
+ /**
+ * Plans data remotely.
+ *
+ * <p>Implementations are encouraged to return groups of matching data
files, enabling this class
+ * to process multiple groups concurrently to speed up the remaining work.
This is particularly
+ * useful when dealing with equality deletes, as delete index lookups with
such delete files
+ * require comparing bounds and typically benefit from parallelization.
+ *
+ * <p>If the result iterable reuses objects, {@link
#shouldCopyRemotelyPlannedDataFiles()} must
+ * return true.
+ *
+ * <p>The input data manifests have been already filtered to include only
potential matches based
+ * on the scan filter. Implementations are expected to further filter these
manifests and only
+ * return files that may hold data matching the scan filter.
+ *
+ * @param dataManifests data manifests that may contain files matching the
scan filter
+ * @param withColumnStats a flag whether to load column stats
+ * @return groups of data files planned remotely
+ */
+ protected abstract Iterable<CloseableIterable<DataFile>> planDataRemotely(
+ List<ManifestFile> dataManifests, boolean withColumnStats);
+
+ /** Returns which planning mode to use for deletes. */
+ protected PlanningMode deletePlanningMode() {
+ Map<String, String> properties = table().properties();
+ String modeName = properties.getOrDefault(DELETE_PLANNING_MODE,
PLANNING_MODE_DEFAULT);
+ return PlanningMode.fromName(modeName);
+ }
+
+ /**
+ * Plans deletes remotely.
+ *
+ * <p>The input delete manifests have been already filtered to include only
potential matches
+ * based on the scan filter. Implementations are expected to further filter
these manifests and
+ * return files that may hold deletes matching the scan filter.
+ *
+ * @param deleteManifests delete manifests that may contain files matching
the scan filter
+ * @return a delete file index planned remotely
+ */
+ protected abstract DeleteFileIndex planDeletesRemotely(List<ManifestFile>
deleteManifests);
+
+ @Override
+ protected CloseableIterable<ScanTask> doPlanFiles() {
+ Snapshot snapshot = snapshot();
+
+ List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot);
+ boolean planDataLocally = shouldPlanLocally(dataPlanningMode(),
dataManifests);
+
+ List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot);
+ boolean planDeletesLocally = shouldPlanLocally(deletePlanningMode(),
deleteManifests);
+
+ if (planDataLocally && planDeletesLocally) {
+ return planFileTasksLocally(dataManifests, deleteManifests);
+ }
+
+ boolean mayHaveEqualityDeletes = deleteManifests.size() > 0 &&
mayHaveEqualityDeletes(snapshot);
+ boolean loadColumnStats = mayHaveEqualityDeletes ||
shouldReturnColumnStats();
+ boolean copyDataFiles = shouldCopyDataFiles(planDataLocally,
loadColumnStats);
+
+ ExecutorService monitorPool = newMonitorPool();
+
+ CompletableFuture<DeleteFileIndex> deletesFuture =
+ newDeletesFuture(deleteManifests, planDeletesLocally, monitorPool);
+
+ CompletableFuture<Iterable<CloseableIterable<DataFile>>> dataFuture =
+ newDataFuture(dataManifests, planDataLocally, loadColumnStats,
monitorPool);
+
+ try {
+ Iterable<CloseableIterable<ScanTask>> fileTasks =
+ toFileTasks(dataFuture, deletesFuture, copyDataFiles);
+
+ if (shouldPlanWithExecutor() && (planDataLocally ||
mayHaveEqualityDeletes)) {
+ return new ParallelIterable<>(fileTasks, planExecutor());
+ } else {
+ return CloseableIterable.concat(fileTasks);
+ }
+
+ } catch (CompletionException e) {
+ deletesFuture.cancel(true /* may interrupt */);
+ dataFuture.cancel(true /* may interrupt */);
+ throw new RuntimeException("Failed to plan files", e);
+
+ } finally {
+ monitorPool.shutdown();
+ }
+ }
+
+ @Override
+ public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+ return TableScanUtil.planTaskGroups(
+ planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+ }
+
+ private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
+ List<ManifestFile> dataManifests = snapshot.dataManifests(io());
+ scanMetrics().totalDataManifests().increment(dataManifests.size());
+
+ List<ManifestFile> matchingDataManifests = filterManifests(dataManifests);
+ int skippedDataManifestsCount = dataManifests.size() -
matchingDataManifests.size();
+ scanMetrics().skippedDataManifests().increment(skippedDataManifestsCount);
+
+ return matchingDataManifests;
+ }
+
+ private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
+ List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
+ scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
+
+ List<ManifestFile> matchingDeleteManifests =
filterManifests(deleteManifests);
+ int skippedDeleteManifestsCount = deleteManifests.size() -
matchingDeleteManifests.size();
+
scanMetrics().skippedDeleteManifests().increment(skippedDeleteManifestsCount);
+
+ return matchingDeleteManifests;
+ }
+
+ private List<ManifestFile> filterManifests(List<ManifestFile> manifests) {
+ Map<Integer, ManifestEvaluator> evalCache =
specCache(this::newManifestEvaluator);
+
+ return manifests.stream()
+ .filter(manifest -> manifest.hasAddedFiles() ||
manifest.hasExistingFiles())
+ .filter(manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest))
+ .collect(Collectors.toList());
+ }
+
+ protected boolean shouldPlanLocally(PlanningMode mode, List<ManifestFile>
manifests) {
+ if (context().planWithCustomizedExecutor()) {
+ return true;
+ }
+
+ switch (mode) {
+ case LOCAL:
+ return true;
+
+ case DISTRIBUTED:
+ return manifests.isEmpty();
+
+ case AUTO:
+ return remoteParallelism() <= localParallelism
+ || manifests.size() <= 2 * localParallelism
+ || totalSize(manifests) <= localPlanningSizeThreshold;
+
+ default:
+ throw new IllegalArgumentException("Unknown planning mode: " + mode);
+ }
+ }
+
+ private long totalSize(List<ManifestFile> manifests) {
+ return manifests.stream().mapToLong(ManifestFile::length).sum();
+ }
+
+ private boolean shouldCopyDataFiles(boolean planDataLocally, boolean
loadColumnStats) {
+ return planDataLocally
+ || shouldCopyRemotelyPlannedDataFiles()
+ || (loadColumnStats && !shouldReturnColumnStats());
+ }
+
+ @SuppressWarnings("unchecked")
+ private CloseableIterable<ScanTask> planFileTasksLocally(
+ List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
+ LOG.info("Planning file tasks locally for table {}", table().name());
+ ManifestGroup manifestGroup = newManifestGroup(dataManifests,
deleteManifests);
+ CloseableIterable<? extends ScanTask> fileTasks =
manifestGroup.planFiles();
+ return (CloseableIterable<ScanTask>) fileTasks;
+ }
+
+ private CompletableFuture<DeleteFileIndex> newDeletesFuture(
+ List<ManifestFile> deleteManifests, boolean planLocally, ExecutorService
monitorPool) {
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (planLocally) {
+ LOG.info("Planning deletes locally for table {}", table().name());
+ return planDeletesLocally(deleteManifests);
+ } else {
+ LOG.info("Planning deletes remotely for table {}", table().name());
+ return planDeletesRemotely(deleteManifests);
+ }
+ },
+ monitorPool);
+ }
+
+ private DeleteFileIndex planDeletesLocally(List<ManifestFile>
deleteManifests) {
+ DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(io(),
deleteManifests);
+
+ if (shouldPlanWithExecutor() && deleteManifests.size() > 1) {
+ builder.planWith(planExecutor());
+ }
+
+ return builder
+ .specsById(table().specs())
+ .filterData(filter())
+ .caseSensitive(isCaseSensitive())
+ .scanMetrics(scanMetrics())
+ .build();
+ }
+
+ private CompletableFuture<Iterable<CloseableIterable<DataFile>>>
newDataFuture(
+ List<ManifestFile> dataManifests,
+ boolean planLocally,
+ boolean withColumnStats,
+ ExecutorService monitorPool) {
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (planLocally) {
+ LOG.info("Planning data locally for table {}", table().name());
+ ManifestGroup manifestGroup = newManifestGroup(dataManifests,
withColumnStats);
+ return manifestGroup.fileGroups();
+ } else {
+ LOG.info("Planning data remotely for table {}", table().name());
+ return planDataRemotely(dataManifests, withColumnStats);
+ }
+ },
+ monitorPool);
+ }
+
+ private Iterable<CloseableIterable<ScanTask>> toFileTasks(
+ CompletableFuture<Iterable<CloseableIterable<DataFile>>> dataFuture,
+ CompletableFuture<DeleteFileIndex> deletesFuture,
+ boolean copyDataFiles) {
+
+ String schemaString = SchemaParser.toJson(tableSchema());
+ Map<Integer, String> specStringCache =
specCache(PartitionSpecParser::toJson);
+ Map<Integer, ResidualEvaluator> residualCache =
specCache(this::newResidualEvaluator);
+
+ Iterable<CloseableIterable<DataFile>> dataFileGroups = dataFuture.join();
+
+ return Iterables.transform(
+ dataFileGroups,
+ dataFiles ->
+ toFileTasks(
+ dataFiles,
+ deletesFuture,
+ copyDataFiles,
+ schemaString,
+ specStringCache,
+ residualCache));
+ }
+
+ private CloseableIterable<ScanTask> toFileTasks(
+ CloseableIterable<DataFile> dataFiles,
+ CompletableFuture<DeleteFileIndex> deletesFuture,
+ boolean copyDataFiles,
+ String schemaString,
+ Map<Integer, String> specStringCache,
+ Map<Integer, ResidualEvaluator> residualCache) {
+
+ return CloseableIterable.transform(
+ dataFiles,
+ dataFile -> {
+ DeleteFile[] deleteFiles =
deletesFuture.join().forDataFile(dataFile);
+
+ String specString = specStringCache.get(dataFile.specId());
+ ResidualEvaluator residuals = residualCache.get(dataFile.specId());
+
+ ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);
+
+ return new BaseFileScanTask(
+ copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) :
dataFile,
+ deleteFiles,
+ schemaString,
+ specString,
+ residuals);
+ });
+ }
+
+ private ManifestEvaluator newManifestEvaluator(PartitionSpec spec) {
+ Expression projection = Projections.inclusive(spec,
isCaseSensitive()).project(filter());
+ return ManifestEvaluator.forPartitionFilter(projection, spec,
isCaseSensitive());
+ }
+
+ private ResidualEvaluator newResidualEvaluator(PartitionSpec spec) {
+ return ResidualEvaluator.of(spec, residualFilter(), isCaseSensitive());
+ }
+
+ private <R> Map<Integer, R> specCache(Function<PartitionSpec, R> load) {
+ Map<Integer, R> cache = Maps.newHashMap();
+ table().specs().forEach((specId, spec) -> cache.put(specId,
load.apply(spec)));
+ return cache;
+ }
+
+ private boolean mayHaveEqualityDeletes(Snapshot snapshot) {
+ String count =
snapshot.summary().get(SnapshotSummary.TOTAL_EQ_DELETES_PROP);
+ return count == null || !count.equals("0");
+ }
+
+ // a monitor pool that enables planing data and deletes concurrently if
remote planning is used
+ private ExecutorService newMonitorPool() {
+ return ThreadPools.newWorkerPool("iceberg-planning-monitor-service",
MONITOR_POOL_SIZE);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 953ad754aa..17e6bc0445 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -58,7 +59,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
"upper_bounds",
"column_sizes");
- private static final List<String> SCAN_WITH_STATS_COLUMNS =
+ protected static final List<String> SCAN_WITH_STATS_COLUMNS =
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
protected static final List<String> DELETE_SCAN_COLUMNS =
@@ -73,12 +74,13 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
"record_count",
"partition",
"key_metadata",
- "split_offsets");
+ "split_offsets",
+ "equality_ids");
protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
ImmutableList.<String>builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
- private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+ protected static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemConfigs.SCAN_THREAD_POOL_ENABLED.value();
private final Table table;
@@ -95,6 +97,10 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
return table;
}
+ protected FileIO io() {
+ return table.io();
+ }
+
protected Schema tableSchema() {
return schema;
}
@@ -111,10 +117,18 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS :
SCAN_COLUMNS;
}
+ protected boolean shouldReturnColumnStats() {
+ return context().returnColumnStats();
+ }
+
protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}
+ protected Expression residualFilter() {
+ return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
+ }
+
protected boolean shouldPlanWithExecutor() {
return PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor();
}
diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java
b/core/src/main/java/org/apache/iceberg/DataScan.java
new file mode 100644
index 0000000000..8de48740b9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataScan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+abstract class DataScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+ extends SnapshotScan<ThisT, T, G> {
+
+ protected DataScan(Table table, Schema schema, TableScanContext context) {
+ super(table, schema, context);
+ }
+
+ @Override
+ protected boolean useSnapshotSchema() {
+ return true;
+ }
+
+ protected ManifestGroup newManifestGroup(
+ List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
+ return newManifestGroup(dataManifests, deleteManifests,
context().returnColumnStats());
+ }
+
+ protected ManifestGroup newManifestGroup(
+ List<ManifestFile> dataManifests, boolean withColumnStats) {
+ return newManifestGroup(dataManifests, ImmutableList.of(),
withColumnStats);
+ }
+
+ protected ManifestGroup newManifestGroup(
+ List<ManifestFile> dataManifests,
+ List<ManifestFile> deleteManifests,
+ boolean withColumnStats) {
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(io(), dataManifests, deleteManifests)
+ .caseSensitive(isCaseSensitive())
+ .select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .filterData(filter())
+ .specsById(table().specs())
+ .scanMetrics(scanMetrics())
+ .ignoreDeleted();
+
+ if (shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
+ if (shouldPlanWithExecutor() && (dataManifests.size() > 1 ||
deleteManifests.size() > 1)) {
+ manifestGroup = manifestGroup.planWith(planExecutor());
+ }
+
+ return manifestGroup;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 6eaa0d5ec7..67d2b0ef35 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.SnapshotUtil;
public class DataTableScan extends BaseTableScan {
protected DataTableScan(Table table, Schema schema, TableScanContext
context) {
@@ -52,12 +51,8 @@ public class DataTableScan extends BaseTableScan {
}
@Override
- public TableScan useSnapshot(long scanSnapshotId) {
- // call method in superclass just for the side effect of argument
validation;
- // we do not use its return value
- super.useSnapshot(scanSnapshotId);
- Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
- return newRefinedScan(table(), snapshotSchema,
context().useSnapshotId(scanSnapshotId));
+ protected boolean useSnapshotSchema() {
+ return true;
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 6f16794d5b..51917a71e9 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -142,6 +142,10 @@ class DeleteFileIndex {
return forDataFile(entry.dataSequenceNumber(), entry.file());
}
+ DeleteFile[] forDataFile(DataFile file) {
+ return forDataFile(file.dataSequenceNumber(), file);
+ }
+
DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
if (isEmpty) {
return NO_DELETES;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 08449a1043..027b8764a9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -222,6 +223,19 @@ class ManifestGroup {
return CloseableIterable.concat(entries((manifest, entries) -> entries));
}
+ /**
+ * Returns an iterable for groups of data files in the set of manifests.
+ *
+ * <p>Files are not copied, it is the caller's responsibility to make
defensive copies if adding
+ * these files to a collection.
+ *
+ * @return an iterable of file groups
+ */
+ public Iterable<CloseableIterable<DataFile>> fileGroups() {
+ return entries(
+ (manifest, entries) -> CloseableIterable.transform(entries,
ManifestEntry::file));
+ }
+
private <T> Iterable<CloseableIterable<T>> entries(
BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>,
CloseableIterable<T>>
entryFn) {
@@ -349,12 +363,7 @@ class ManifestGroup {
entry -> {
DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
- for (DeleteFile deleteFile : deleteFiles) {
-
ctx.scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
- }
-
ctx.scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
- ctx.scanMetrics().resultDataFiles().increment();
- ctx.scanMetrics().resultDeleteFiles().increment((long)
deleteFiles.length);
+ ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles);
return new BaseFileScanTask(
dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(),
ctx.residuals());
});
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/main/java/org/apache/iceberg/PlanningMode.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/main/java/org/apache/iceberg/PlanningMode.java
index 102f48ee19..ef448fb22c 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/PlanningMode.java
@@ -16,22 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-public class ScanMetricsUtil {
+public enum PlanningMode {
+ AUTO("auto"),
+ LOCAL("local"),
+ DISTRIBUTED("distributed");
- private ScanMetricsUtil() {}
+ private final String modeName;
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
+ PlanningMode(String modeName) {
+ this.modeName = modeName;
+ }
+
+ public static PlanningMode fromName(String modeName) {
+ Preconditions.checkArgument(modeName != null, "Mode name is null");
+
+ if (AUTO.modeName().equalsIgnoreCase(modeName)) {
+ return AUTO;
+
+ } else if (LOCAL.modeName().equalsIgnoreCase(modeName)) {
+ return LOCAL;
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
+ } else if (DISTRIBUTED.modeName().equalsIgnoreCase(modeName)) {
+ return DISTRIBUTED;
+
+ } else {
+ throw new IllegalArgumentException("Unknown planning mode: " + modeName);
}
}
+
+ public String modeName() {
+ return modeName;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index de53444ba9..a98a8c9f13 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -66,6 +66,11 @@ public abstract class SnapshotScan<ThisT, T extends
ScanTask, G extends ScanTask
protected abstract CloseableIterable<T> doPlanFiles();
+ // controls whether to use the snapshot schema while time travelling
+ protected boolean useSnapshotSchema() {
+ return false;
+ }
+
protected ScanMetrics scanMetrics() {
if (scanMetrics == null) {
this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
@@ -81,7 +86,10 @@ public abstract class SnapshotScan<ThisT, T extends
ScanTask, G extends ScanTask
table().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s",
scanSnapshotId);
- return newRefinedScan(table(), tableSchema(),
context().useSnapshotId(scanSnapshotId));
+ Schema newSchema =
+ useSnapshotSchema() ? SnapshotUtil.schemaFor(table(), scanSnapshotId)
: tableSchema();
+ TableScanContext newContext = context().useSnapshotId(scanSnapshotId);
+ return newRefinedScan(table(), newSchema, newContext);
}
public ThisT useRef(String name) {
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a9116bc57f..03e1f3ce88 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -231,6 +231,10 @@ public class TableProperties {
public static final String ORC_BATCH_SIZE =
"read.orc.vectorization.batch-size";
public static final int ORC_BATCH_SIZE_DEFAULT = 5000;
+ public static final String DATA_PLANNING_MODE = "read.data-planning-mode";
+ public static final String DELETE_PLANNING_MODE =
"read.delete-planning-mode";
+ public static final String PLANNING_MODE_DEFAULT =
PlanningMode.AUTO.modeName();
+
public static final String OBJECT_STORE_ENABLED =
"write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
index 102f48ee19..c5aa6e1dd6 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.metrics;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
@@ -34,4 +35,17 @@ public class ScanMetricsUtil {
metrics.equalityDeleteFiles().increment();
}
}
+
+ public static void fileTask(ScanMetrics metrics, DataFile dataFile,
DeleteFile[] deleteFiles) {
+ metrics.totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
+ metrics.resultDataFiles().increment();
+ metrics.resultDeleteFiles().increment(deleteFiles.length);
+
+ long deletesSizeInBytes = 0L;
+ for (DeleteFile deleteFile : deleteFiles) {
+ deletesSizeInBytes += deleteFile.fileSizeInBytes();
+ }
+
+ metrics.totalDeleteFileSizeInBytes().increment(deletesSizeInBytes);
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
b/core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
similarity index 81%
rename from core/src/test/java/org/apache/iceberg/TestDataTableScan.java
rename to core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
index 8541a96c8d..7133a5a761 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
@@ -30,15 +30,19 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
-public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask,
CombinedScanTask> {
- public TestDataTableScan(int formatVersion) {
+public abstract class DataTableScanTestBase<
+ ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends
ScanTaskGroup<T>>
+ extends ScanTestBase<ScanT, T, G> {
+
+ public DataTableScanTestBase(int formatVersion) {
super(formatVersion);
}
- @Override
- protected TableScan newScan() {
- return table.newScan();
- }
+ protected abstract ScanT useRef(ScanT scan, String ref);
+
+ protected abstract ScanT useSnapshot(ScanT scan, long snapshotId);
+
+ protected abstract ScanT asOfTime(ScanT scan, long timestampMillis);
@Test
public void testTaskRowCounts() {
@@ -56,17 +60,17 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
DeleteFile deleteFile2 = newDeleteFile("data_bucket=1");
table.newRowDelta().addDeletes(deleteFile2).commit();
- TableScan scan = table.newScan().option(TableProperties.SPLIT_SIZE, "50");
+ ScanT scan = newScan().option(TableProperties.SPLIT_SIZE, "50");
- List<FileScanTask> fileScanTasks = Lists.newArrayList(scan.planFiles());
+ List<T> fileScanTasks = Lists.newArrayList(scan.planFiles());
Assert.assertEquals("Must have 2 FileScanTasks", 2, fileScanTasks.size());
- for (FileScanTask task : fileScanTasks) {
+ for (T task : fileScanTasks) {
Assert.assertEquals("Rows count must match", 10,
task.estimatedRowsCount());
}
- List<CombinedScanTask> combinedScanTasks =
Lists.newArrayList(scan.planTasks());
+ List<G> combinedScanTasks = Lists.newArrayList(scan.planTasks());
Assert.assertEquals("Must have 4 CombinedScanTask", 4,
combinedScanTasks.size());
- for (CombinedScanTask task : combinedScanTasks) {
+ for (G task : combinedScanTasks) {
Assert.assertEquals("Rows count must match", 5,
task.estimatedRowsCount());
}
}
@@ -100,11 +104,11 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
// Add D to main
table.newFastAppend().appendFile(FILE_D).commit();
- TableScan testBranchScan = table.newScan().useRef("testBranch");
+ ScanT testBranchScan = useRef(newScan(), "testBranch");
validateExpectedFileScanTasks(
testBranchScan, ImmutableList.of(FILE_A.path(), FILE_B.path(),
FILE_C.path()));
- TableScan mainScan = table.newScan();
+ ScanT mainScan = newScan();
validateExpectedFileScanTasks(mainScan, ImmutableList.of(FILE_A.path(),
FILE_D.path()));
}
@@ -113,9 +117,9 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB",
table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_C).commit();
- TableScan tagScan = table.newScan().useRef("tagB");
+ ScanT tagScan = useRef(newScan(), "tagB");
validateExpectedFileScanTasks(tagScan, ImmutableList.of(FILE_A.path(),
FILE_B.path()));
- TableScan mainScan = table.newScan();
+ ScanT mainScan = newScan();
validateExpectedFileScanTasks(
mainScan, ImmutableList.of(FILE_A.path(), FILE_B.path(),
FILE_C.path()));
}
@@ -126,7 +130,7 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
table.manageSnapshots().createTag("tagB",
table.currentSnapshot().snapshotId()).commit();
Assertions.assertThatThrownBy(
- () ->
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).useRef("tagB"))
+ () -> useRef(useSnapshot(newScan(),
table.currentSnapshot().snapshotId()), "tagB"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot override ref, already set snapshot id=1");
}
@@ -139,7 +143,7 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
table.manageSnapshots().createTag("tagB",
table.currentSnapshot().snapshotId()).commit();
Assertions.assertThatThrownBy(
- () ->
table.newScan().useRef("tagB").useSnapshot(snapshotA.snapshotId()))
+ () -> useSnapshot(useRef(newScan(), "tagB"),
snapshotA.snapshotId()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot override snapshot, already set snapshot id=2");
}
@@ -153,7 +157,7 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
.commit();
Assertions.assertThatThrownBy(
- () ->
table.newScan().useRef("testBranch").asOfTime(System.currentTimeMillis()))
+ () -> asOfTime(useRef(newScan(), "testBranch"),
System.currentTimeMillis()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot override snapshot, already set snapshot id=1");
}
@@ -165,24 +169,26 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
table.newFastAppend().appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB",
table.currentSnapshot().snapshotId()).commit();
- Assertions.assertThatThrownBy(() ->
table.newScan().useRef("tagB").useRef("tagA"))
+ Assertions.assertThatThrownBy(() -> useRef(useRef(newScan(), "tagB"),
"tagA"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot override ref, already set snapshot id=2");
}
@Test
public void testSettingInvalidRefFails() {
- Assertions.assertThatThrownBy(() -> table.newScan().useRef("nonexisting"))
+ Assertions.assertThatThrownBy(() -> useRef(newScan(), "nonexisting"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot find ref nonexisting");
}
- private void validateExpectedFileScanTasks(
- TableScan scan, List<CharSequence> expectedFileScanPaths) throws
IOException {
- try (CloseableIterable<FileScanTask> scanTasks = scan.planFiles()) {
+ private void validateExpectedFileScanTasks(ScanT scan, List<CharSequence>
expectedFileScanPaths)
+ throws IOException {
+ try (CloseableIterable<T> scanTasks = scan.planFiles()) {
Assert.assertEquals(expectedFileScanPaths.size(),
Iterables.size(scanTasks));
List<CharSequence> actualFiles = Lists.newArrayList();
- scanTasks.forEach(task -> actualFiles.add(task.file().path()));
+ for (T task : scanTasks) {
+ actualFiles.add(((FileScanTask) task).file().path());
+ }
Assert.assertTrue(actualFiles.containsAll(expectedFileScanPaths));
}
}
@@ -203,12 +209,13 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
DeleteFile deleteFile2 = newDeleteFile("data_bucket=1");
table.newRowDelta().addDeletes(deleteFile2).commit();
- TableScan scan = table.newScan();
+ ScanT scan = newScan();
- List<FileScanTask> fileScanTasks = Lists.newArrayList(scan.planFiles());
+ List<T> fileScanTasks = Lists.newArrayList(scan.planFiles());
Assert.assertEquals("Must have 2 FileScanTasks", 2, fileScanTasks.size());
- for (FileScanTask task : fileScanTasks) {
- DataFile file = task.file();
+ for (T task : fileScanTasks) {
+ FileScanTask fileScanTask = (FileScanTask) task;
+ DataFile file = fileScanTask.file();
long expectedDataSequenceNumber = 0L;
long expectedDeleteSequenceNumber = 0L;
if (file.path().equals(dataFile1.path())) {
@@ -230,7 +237,7 @@ public class TestDataTableScan extends
ScanTestBase<TableScan, FileScanTask, Com
expectedDataSequenceNumber,
file.fileSequenceNumber().longValue());
- List<DeleteFile> deleteFiles = task.deletes();
+ List<DeleteFile> deleteFiles = fileScanTask.deletes();
Assert.assertEquals("Must have 1 delete file", 1,
Iterables.size(deleteFiles));
DeleteFile deleteFile = Iterables.getOnlyElement(deleteFiles);
Assert.assertEquals(
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
similarity index 92%
rename from core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
rename to core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index 4e574690f7..6f33be0948 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -33,8 +33,11 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
-public class TestDeleteFileIndex extends TableTestBase {
- public TestDeleteFileIndex() {
+public abstract class DeleteFileIndexTestBase<
+ ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends
ScanTaskGroup<T>>
+ extends TableTestBase {
+
+ public DeleteFileIndexTestBase() {
super(2 /* table format version */);
}
@@ -109,6 +112,8 @@ public class TestDeleteFileIndex extends TableTestBase {
return file;
}
+ protected abstract ScanT newScan(Table table);
+
@Test
public void testMinSequenceNumberFilteringForFiles() {
PartitionSpec partSpec = PartitionSpec.unpartitioned();
@@ -248,10 +253,10 @@ public class TestDeleteFileIndex extends TableTestBase {
DeleteFile unpartitionedPosDeletes =
unpartitionedPosDeletes(unpartitioned.spec());
unpartitioned.newRowDelta().addDeletes(unpartitionedPosDeletes).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(unpartitioned.newScan().planFiles().iterator());
+ List<T> tasks =
Lists.newArrayList(newScan(unpartitioned).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", unpartitionedFile.path(),
task.file().path());
Assert.assertEquals("Should have one associated delete file", 1,
task.deletes().size());
@@ -264,8 +269,8 @@ public class TestDeleteFileIndex extends TableTestBase {
DeleteFile unpartitionedEqDeletes =
unpartitionedEqDeletes(unpartitioned.spec());
unpartitioned.newRowDelta().addDeletes(unpartitionedEqDeletes).commit();
- tasks = Lists.newArrayList(unpartitioned.newScan().planFiles().iterator());
- task = tasks.get(0);
+ tasks = Lists.newArrayList(newScan(unpartitioned).planFiles().iterator());
+ task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", unpartitionedFile.path(),
task.file().path());
Assert.assertEquals("Should have two associated delete files", 2,
task.deletes().size());
@@ -281,10 +286,10 @@ public class TestDeleteFileIndex extends TableTestBase {
table.newRowDelta().addDeletes(FILE_A_POS_1).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have one associated delete file", 1,
task.deletes().size());
@@ -298,10 +303,10 @@ public class TestDeleteFileIndex extends TableTestBase {
table.newRowDelta().addDeletes(FILE_A_EQ_1).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have one associated delete file", 1,
task.deletes().size());
@@ -315,10 +320,10 @@ public class TestDeleteFileIndex extends TableTestBase {
table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_B.path(),
task.file().path());
Assert.assertEquals("Should have no delete files to apply", 0,
task.deletes().size());
@@ -330,10 +335,10 @@ public class TestDeleteFileIndex extends TableTestBase {
table.newAppend().appendFile(FILE_A).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have no delete files to apply", 0,
task.deletes().size());
@@ -354,10 +359,10 @@ public class TestDeleteFileIndex extends TableTestBase {
.addDeletes(unpartitionedEqDeletes)
.commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have one associated delete file", 1,
task.deletes().size());
@@ -384,10 +389,10 @@ public class TestDeleteFileIndex extends TableTestBase {
.addDeletes(unpartitionedEqDeletes)
.commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have two associated delete files", 2,
task.deletes().size());
@@ -401,10 +406,10 @@ public class TestDeleteFileIndex extends TableTestBase {
public void testPartitionedTableSequenceNumbers() {
table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(FILE_A_POS_1).commit();
- List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().planFiles().iterator());
+ List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have one associated delete file", 1,
task.deletes().size());
@@ -495,16 +500,12 @@ public class TestDeleteFileIndex extends TableTestBase {
2,
table.currentSnapshot().deleteManifests(table.io()).get(0).existingFilesCount().intValue());
- List<FileScanTask> tasks =
+ List<T> tasks =
Lists.newArrayList(
- table
- .newScan()
- .filter(equal(bucket("data", BUCKETS_NUMBER), 0))
- .planFiles()
- .iterator());
+ newScan(table).filter(equal(bucket("data", BUCKETS_NUMBER),
0)).planFiles().iterator());
Assert.assertEquals("Should have one task", 1, tasks.size());
- FileScanTask task = tasks.get(0);
+ FileScanTask task = (FileScanTask) tasks.get(0);
Assert.assertEquals(
"Should have the correct data file path", FILE_A.path(),
task.file().path());
Assert.assertEquals("Should have two associated delete files", 2,
task.deletes().size());
diff --git a/core/src/test/java/org/apache/iceberg/TestFilterFiles.java
b/core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
similarity index 87%
rename from core/src/test/java/org/apache/iceberg/TestFilterFiles.java
rename to core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
index 34b18e18b8..995a07f2eb 100644
--- a/core/src/test/java/org/apache/iceberg/TestFilterFiles.java
+++ b/core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
@@ -35,22 +35,18 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFilterFiles {
- @Parameterized.Parameters(name = "formatVersion = {0}")
- public static Object[] parameters() {
- return new Object[] {1, 2};
- }
+
+public abstract class FilterFilesTestBase<
+ ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends
ScanTaskGroup<T>> {
public final int formatVersion;
- public TestFilterFiles(int formatVersion) {
+ public FilterFilesTestBase(int formatVersion) {
this.formatVersion = formatVersion;
}
+ protected abstract ScanT newScan(Table table);
+
@Rule public TemporaryFolder temp = new TemporaryFolder();
private final Schema schema =
new Schema(
@@ -122,10 +118,10 @@ public class TestFilterFiles {
table.refresh();
- TableScan emptyScan = table.newScan().filter(Expressions.equal("id", 5));
+ ScanT emptyScan = newScan(table).filter(Expressions.equal("id", 5));
assertEquals(0, Iterables.size(emptyScan.planFiles()));
- TableScan nonEmptyScan = table.newScan().filter(Expressions.equal("id",
1));
+ ScanT nonEmptyScan = newScan(table).filter(Expressions.equal("id", 1));
assertEquals(1, Iterables.size(nonEmptyScan.planFiles()));
}
@@ -156,11 +152,10 @@ public class TestFilterFiles {
table.refresh();
- TableScan emptyScan =
table.newScan().caseSensitive(false).filter(Expressions.equal("ID", 5));
+ ScanT emptyScan =
newScan(table).caseSensitive(false).filter(Expressions.equal("ID", 5));
assertEquals(0, Iterables.size(emptyScan.planFiles()));
- TableScan nonEmptyScan =
- table.newScan().caseSensitive(false).filter(Expressions.equal("ID",
1));
+ ScanT nonEmptyScan =
newScan(table).caseSensitive(false).filter(Expressions.equal("ID", 1));
assertEquals(1, Iterables.size(nonEmptyScan.planFiles()));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
similarity index 55%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
index 102f48ee19..897cbed488 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
@@ -16,22 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+public class TestLocalDataTableScan
+ extends DataTableScanTestBase<TableScan, FileScanTask, CombinedScanTask> {
-public class ScanMetricsUtil {
+ public TestLocalDataTableScan(int formatVersion) {
+ super(formatVersion);
+ }
- private ScanMetricsUtil() {}
+ @Override
+ protected TableScan useRef(TableScan scan, String ref) {
+ return scan.useRef(ref);
+ }
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
+ @Override
+ protected TableScan useSnapshot(TableScan scan, long snapshotId) {
+ return scan.useSnapshot(snapshotId);
+ }
+
+ @Override
+ protected TableScan asOfTime(TableScan scan, long timestampMillis) {
+ return scan.asOfTime(timestampMillis);
+ }
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
- }
+ @Override
+ protected TableScan newScan() {
+ return table.newScan();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
similarity index 59%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
index 102f48ee19..d01e6253d4 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
@@ -16,22 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+public class TestLocalDeleteFileIndex
+ extends DeleteFileIndexTestBase<TableScan, FileScanTask, CombinedScanTask>
{
-public class ScanMetricsUtil {
-
- private ScanMetricsUtil() {}
-
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
- }
+ @Override
+ protected TableScan newScan(Table table) {
+ return table.newScan();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
similarity index 59%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
index 102f48ee19..b7ff71461c 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
@@ -16,22 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-public class ScanMetricsUtil {
+@RunWith(Parameterized.class)
+public class TestLocalFilterFiles
+ extends FilterFilesTestBase<TableScan, FileScanTask, CombinedScanTask> {
- private ScanMetricsUtil() {}
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
+ public TestLocalFilterFiles(int formatVersion) {
+ super(formatVersion);
+ }
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
- }
+ @Override
+ protected TableScan newScan(Table table) {
+ return table.newScan();
}
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index e660a2ca16..24124a97e2 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -20,11 +20,15 @@ package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.DataOperations.OVERWRITE;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
+import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
import static
org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
@@ -43,6 +47,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
@@ -74,6 +79,7 @@ public abstract class SparkRowLevelOperationsTestBase extends
SparkExtensionsTes
protected final String distributionMode;
protected final boolean fanoutEnabled;
protected final String branch;
+ protected final PlanningMode planningMode;
public SparkRowLevelOperationsTestBase(
String catalogName,
@@ -83,19 +89,22 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(catalogName, implementation, config);
this.fileFormat = fileFormat;
this.vectorized = vectorized;
this.distributionMode = distributionMode;
this.fanoutEnabled = fanoutEnabled;
this.branch = branch;
+ this.planningMode = planningMode;
}
@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2},"
- + " format = {3}, vectorized = {4}, distributionMode = {5},
fanout = {6}, branch = {7}")
+ + " format = {3}, vectorized = {4}, distributionMode = {5},"
+ + " fanout = {6}, branch = {7}, planningMode = {8}")
public static Object[][] parameters() {
return new Object[][] {
{
@@ -108,7 +117,8 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
true,
WRITE_DISTRIBUTION_MODE_NONE,
true,
- SnapshotRef.MAIN_BRANCH
+ SnapshotRef.MAIN_BRANCH,
+ LOCAL
},
{
"testhive",
@@ -121,6 +131,7 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
WRITE_DISTRIBUTION_MODE_NONE,
false,
null,
+ DISTRIBUTED
},
{
"testhadoop",
@@ -130,7 +141,8 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
RANDOM.nextBoolean(),
WRITE_DISTRIBUTION_MODE_HASH,
true,
- null
+ null,
+ LOCAL
},
{
"spark_catalog",
@@ -147,7 +159,8 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
false,
WRITE_DISTRIBUTION_MODE_RANGE,
false,
- "test"
+ "test",
+ DISTRIBUTED
}
};
}
@@ -156,14 +169,18 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
protected void initTable() {
sql(
- "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s')",
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s',
'%s' '%s', '%s' '%s')",
tableName,
DEFAULT_FILE_FORMAT,
fileFormat,
WRITE_DISTRIBUTION_MODE,
distributionMode,
SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
- String.valueOf(fanoutEnabled));
+ String.valueOf(fanoutEnabled),
+ DATA_PLANNING_MODE,
+ planningMode.modeName(),
+ DELETE_PLANNING_MODE,
+ planningMode.modeName());
switch (fileFormat) {
case "parquet":
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index 45cfb014eb..9ebe73da33 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -62,7 +63,8 @@ public class TestCopyOnWriteDelete extends TestDelete {
Boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -71,7 +73,8 @@ public class TestCopyOnWriteDelete extends TestDelete {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 6edbff480d..6b6819a924 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -60,7 +61,8 @@ public class TestCopyOnWriteMerge extends TestMerge {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -69,7 +71,8 @@ public class TestCopyOnWriteMerge extends TestMerge {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index a398b99661..4354a1019c 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -59,7 +60,8 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -68,7 +70,8 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index b2d1e479e7..8214de6dfa 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
@@ -92,7 +93,8 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
Boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -101,7 +103,8 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@BeforeClass
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 843081a016..64d356af04 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
@@ -83,7 +84,8 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -92,7 +94,8 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@BeforeClass
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index e32b0f22ac..9c0e8235f8 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Table;
@@ -51,7 +52,8 @@ public class TestMergeOnReadDelete extends TestDelete {
Boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -60,7 +62,8 @@ public class TestMergeOnReadDelete extends TestDelete {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index fc212ac844..e743b32b45 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;
import java.util.Map;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -33,7 +34,8 @@ public class TestMergeOnReadMerge extends TestMerge {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -42,7 +44,8 @@ public class TestMergeOnReadMerge extends TestMerge {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index f40afac459..0207d4ce4d 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;
import java.util.Map;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -33,7 +34,8 @@ public class TestMergeOnReadUpdate extends TestUpdate {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -42,7 +44,8 @@ public class TestMergeOnReadUpdate extends TestUpdate {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@Override
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 39249173ae..8f7585ccd7 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
@@ -86,7 +87,8 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
boolean vectorized,
String distributionMode,
boolean fanoutEnabled,
- String branch) {
+ String branch,
+ PlanningMode planningMode) {
super(
catalogName,
implementation,
@@ -95,7 +97,8 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
vectorized,
distributionMode,
fanoutEnabled,
- branch);
+ branch,
+ planningMode);
}
@BeforeClass
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
new file mode 100644
index 0000000000..d4c2848b45
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.ClosingIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.JobGroupUtils;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.actions.ManifestFileBean;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A batch data scan that can utilize Spark cluster resources for planning.
+ *
+ * <p>This scan remotely filters manifests, fetching only the relevant data
and delete files to the
+ * driver. The delete file assignment is done locally after the remote
filtering step. Such approach
+ * is beneficial if the remote parallelism is much higher than the number of
driver cores.
+ *
+ * <p>This scan is best suited for queries with selective filters on
lower/upper bounds across all
+ * partitions, or against poorly clustered metadata. This allows job planning
to benefit from highly
+ * concurrent remote filtering while not incurring high serialization and data
transfer costs. This
+ * class is also useful for full table scans over large tables but the cost of
bringing data and
+ * delete file details to the driver may become noticeable. Make sure to
follow the performance tips
+ * below in such cases.
+ *
+ * <p>Ensure the filtered metadata size doesn't exceed the driver's max result
size. For large table
+ * scans, consider increasing `spark.driver.maxResultSize` to avoid job
failures.
+ *
+ * <p>Performance tips:
+ *
+ * <ul>
+ * <li>Enable Kryo serialization (`spark.serializer`)
+ * <li>Increase the number of driver cores (`spark.driver.cores`)
+ * <li>Tune the number of threads used to fetch task results
(`spark.resultGetter.threads`)
+ * </ul>
+ */
+public class SparkDistributedDataScan extends BaseDistributedDataScan {
+
+ private static final Joiner COMMA = Joiner.on(',');
+ private static final String DELETE_PLANNING_JOB_GROUP_ID = "DELETE-PLANNING";
+ private static final String DATA_PLANNING_JOB_GROUP_ID = "DATA-PLANNING";
+
+ private final SparkSession spark;
+ private final JavaSparkContext sparkContext;
+ private final SparkReadConf readConf;
+
+ private Broadcast<Table> tableBroadcast = null;
+
+ public SparkDistributedDataScan(SparkSession spark, Table table,
SparkReadConf readConf) {
+ this(spark, table, readConf, table.schema(), TableScanContext.empty());
+ }
+
+ private SparkDistributedDataScan(
+ SparkSession spark,
+ Table table,
+ SparkReadConf readConf,
+ Schema schema,
+ TableScanContext context) {
+ super(table, schema, context);
+ this.spark = spark;
+ this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ this.readConf = readConf;
+ }
+
+ @Override
+ protected BatchScan newRefinedScan(
+ Table newTable, Schema newSchema, TableScanContext newContext) {
+ return new SparkDistributedDataScan(spark, newTable, readConf, newSchema,
newContext);
+ }
+
+ @Override
+ protected int remoteParallelism() {
+ return readConf.parallelism();
+ }
+
+ @Override
+ protected PlanningMode dataPlanningMode() {
+ return readConf.dataPlanningMode();
+ }
+
+ @Override
+ protected boolean shouldCopyRemotelyPlannedDataFiles() {
+ return false;
+ }
+
+ @Override
+ protected Iterable<CloseableIterable<DataFile>> planDataRemotely(
+ List<ManifestFile> dataManifests, boolean withColumnStats) {
+ JobGroupInfo info = new JobGroupInfo(DATA_PLANNING_JOB_GROUP_ID,
jobDesc("data"));
+ return withJobGroupInfo(info, () -> doPlanDataRemotely(dataManifests,
withColumnStats));
+ }
+
+ private Iterable<CloseableIterable<DataFile>> doPlanDataRemotely(
+ List<ManifestFile> dataManifests, boolean withColumnStats) {
+ scanMetrics().scannedDataManifests().increment(dataManifests.size());
+
+ JavaRDD<DataFile> dataFileRDD =
+ sparkContext
+ .parallelize(toBeans(dataManifests), dataManifests.size())
+ .flatMap(new ReadDataManifest(tableBroadcast(), context(),
withColumnStats));
+ List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
+
+ return Iterables.transform(dataFileGroups,
CloseableIterable::withNoopClose);
+ }
+
+ @Override
+ protected PlanningMode deletePlanningMode() {
+ return readConf.deletePlanningMode();
+ }
+
+ @Override
+ protected DeleteFileIndex planDeletesRemotely(List<ManifestFile>
deleteManifests) {
+ JobGroupInfo info = new JobGroupInfo(DELETE_PLANNING_JOB_GROUP_ID,
jobDesc("deletes"));
+ return withJobGroupInfo(info, () ->
doPlanDeletesRemotely(deleteManifests));
+ }
+
+ private DeleteFileIndex doPlanDeletesRemotely(List<ManifestFile>
deleteManifests) {
+ scanMetrics().scannedDeleteManifests().increment(deleteManifests.size());
+
+ List<DeleteFile> deleteFiles =
+ sparkContext
+ .parallelize(toBeans(deleteManifests), deleteManifests.size())
+ .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
+ .collect();
+
+ return DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .scanMetrics(scanMetrics())
+ .build();
+ }
+
+ private <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
+ return JobGroupUtils.withJobGroupInfo(sparkContext, info, supplier);
+ }
+
+ private String jobDesc(String type) {
+ List<String> options = Lists.newArrayList();
+ options.add("snapshot_id=" + snapshot().snapshotId());
+ String optionsAsString = COMMA.join(options);
+ return String.format("Planning %s (%s) for %s", type, optionsAsString,
table().name());
+ }
+
+ private List<ManifestFileBean> toBeans(List<ManifestFile> manifests) {
+ return
manifests.stream().map(ManifestFileBean::fromManifest).collect(Collectors.toList());
+ }
+
+ private Broadcast<Table> tableBroadcast() {
+ if (tableBroadcast == null) {
+ Table serializableTable = SerializableTableWithSize.copyOf(table());
+ this.tableBroadcast = sparkContext.broadcast(serializableTable);
+ }
+
+ return tableBroadcast;
+ }
+
+ private <T> List<List<T>> collectPartitions(JavaRDD<T> rdd) {
+ int[] partitionIds = IntStream.range(0, rdd.getNumPartitions()).toArray();
+ return Arrays.asList(rdd.collectPartitions(partitionIds));
+ }
+
+ private static class ReadDataManifest implements
FlatMapFunction<ManifestFileBean, DataFile> {
+
+ private final Broadcast<Table> table;
+ private final Expression filter;
+ private final boolean withStats;
+ private final boolean isCaseSensitive;
+
+ ReadDataManifest(Broadcast<Table> table, TableScanContext context, boolean
withStats) {
+ this.table = table;
+ this.filter = context.rowFilter();
+ this.withStats = withStats;
+ this.isCaseSensitive = context.caseSensitive();
+ }
+
+ @Override
+ public Iterator<DataFile> call(ManifestFileBean manifest) throws Exception
{
+ FileIO io = table.value().io();
+ Map<Integer, PartitionSpec> specs = table.value().specs();
+ return new ClosingIterator<>(
+ ManifestFiles.read(manifest, io, specs)
+ .select(withStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .filterRows(filter)
+ .caseSensitive(isCaseSensitive)
+ .iterator());
+ }
+ }
+
+ private static class ReadDeleteManifest implements
FlatMapFunction<ManifestFileBean, DeleteFile> {
+
+ private final Broadcast<Table> table;
+ private final Expression filter;
+ private final boolean isCaseSensitive;
+
+ ReadDeleteManifest(Broadcast<Table> table, TableScanContext context) {
+ this.table = table;
+ this.filter = context.rowFilter();
+ this.isCaseSensitive = context.caseSensitive();
+ }
+
+ @Override
+ public Iterator<DeleteFile> call(ManifestFileBean manifest) throws
Exception {
+ FileIO io = table.value().io();
+ Map<Integer, PartitionSpec> specs = table.value().specs();
+ return new ClosingIterator<>(
+ ManifestFiles.readDeleteManifest(manifest, io, specs)
+ .select(DELETE_SCAN_WITH_STATS_COLUMNS)
+ .filterRows(filter)
+ .caseSensitive(isCaseSensitive)
+ .iterator());
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 75e060e556..be0ba7d6bc 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -18,12 +18,16 @@
*/
package org.apache.iceberg.spark;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
import java.util.Map;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
/**
@@ -46,6 +50,10 @@ import org.apache.spark.sql.SparkSession;
*/
public class SparkReadConf {
+ private static final String DRIVER_MAX_RESULT_SIZE =
"spark.driver.maxResultSize";
+ private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G";
+ private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024
* 1024; // 256 MB
+
private final SparkSession spark;
private final Table table;
private final String branch;
@@ -281,4 +289,43 @@ public class SparkReadConf {
int numShufflePartitions =
spark.sessionState().conf().numShufflePartitions();
return Math.max(defaultParallelism, numShufflePartitions);
}
+
+ public boolean distributedPlanningEnabled() {
+ return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
+ }
+
+ public PlanningMode dataPlanningMode() {
+ if (driverMaxResultSize() < DISTRIBUTED_PLANNING_MIN_RESULT_SIZE) {
+ return LOCAL;
+ }
+
+ String modeName =
+ confParser
+ .stringConf()
+ .sessionConf(SparkSQLProperties.DATA_PLANNING_MODE)
+ .tableProperty(TableProperties.DATA_PLANNING_MODE)
+ .defaultValue(TableProperties.PLANNING_MODE_DEFAULT)
+ .parse();
+ return PlanningMode.fromName(modeName);
+ }
+
+ public PlanningMode deletePlanningMode() {
+ if (driverMaxResultSize() < DISTRIBUTED_PLANNING_MIN_RESULT_SIZE) {
+ return LOCAL;
+ }
+
+ String modeName =
+ confParser
+ .stringConf()
+ .sessionConf(SparkSQLProperties.DELETE_PLANNING_MODE)
+ .tableProperty(TableProperties.DELETE_PLANNING_MODE)
+ .defaultValue(TableProperties.PLANNING_MODE_DEFAULT)
+ .parse();
+ return PlanningMode.fromName(modeName);
+ }
+
+ private long driverMaxResultSize() {
+ SparkConf sparkConf = spark.sparkContext().conf();
+ return sparkConf.getSizeAsBytes(DRIVER_MAX_RESULT_SIZE,
DRIVER_MAX_RESULT_SIZE_DEFAULT);
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 0c08587b4b..9accd6f108 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -68,4 +68,10 @@ public class SparkSQLProperties {
public static final String COMPRESSION_CODEC =
"spark.sql.iceberg.compression-codec";
public static final String COMPRESSION_LEVEL =
"spark.sql.iceberg.compression-level";
public static final String COMPRESSION_STRATEGY =
"spark.sql.iceberg.compression-strategy";
+
+ // Overrides the data planning mode
+ public static final String DATA_PLANNING_MODE =
"spark.sql.iceberg.data-planning-mode";
+
+ // Overrides the delete planning mode
+ public static final String DELETE_PLANNING_MODE =
"spark.sql.iceberg.delete-planning-mode";
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 613e53767e..62f5167526 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -157,6 +157,7 @@ abstract class BaseSparkAction<ThisT> {
"content",
"path",
"length",
+ "0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 45647070e6..11ad834244 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.actions;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.ManifestContent;
@@ -25,7 +26,8 @@ import org.apache.iceberg.ManifestFile;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
-public class ManifestFileBean implements ManifestFile {
+/** A serializable bean that contains a bare minimum to read a manifest. */
+public class ManifestFileBean implements ManifestFile, Serializable {
public static final Encoder<ManifestFileBean> ENCODER =
Encoders.bean(ManifestFileBean.class);
private String path = null;
@@ -33,6 +35,20 @@ public class ManifestFileBean implements ManifestFile {
private Integer partitionSpecId = null;
private Long addedSnapshotId = null;
private Integer content = null;
+ private Long sequenceNumber = null;
+
+ public static ManifestFileBean fromManifest(ManifestFile manifest) {
+ ManifestFileBean bean = new ManifestFileBean();
+
+ bean.setPath(manifest.path());
+ bean.setLength(manifest.length());
+ bean.setPartitionSpecId(manifest.partitionSpecId());
+ bean.setAddedSnapshotId(manifest.snapshotId());
+ bean.setContent(manifest.content().id());
+ bean.setSequenceNumber(manifest.sequenceNumber());
+
+ return bean;
+ }
public String getPath() {
return path;
@@ -74,6 +90,14 @@ public class ManifestFileBean implements ManifestFile {
this.content = content;
}
+ public Long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public void setSequenceNumber(Long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
@Override
public String path() {
return path;
@@ -96,7 +120,7 @@ public class ManifestFileBean implements ManifestFile {
@Override
public long sequenceNumber() {
- return 0;
+ return sequenceNumber;
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 3a430cd86f..55b0096bf6 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SparkDistributedDataScan;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -436,8 +437,7 @@ public class SparkScanBuilder
Schema expectedSchema = schemaWithMetadataColumns();
BatchScan scan =
- table
- .newBatchScan()
+ newBatchScan()
.caseSensitive(caseSensitive)
.filter(filterExpression())
.project(expectedSchema)
@@ -625,8 +625,7 @@ public class SparkScanBuilder
Schema expectedSchema = schemaWithMetadataColumns();
BatchScan scan =
- table
- .newBatchScan()
+ newBatchScan()
.useSnapshot(snapshotId)
.caseSensitive(caseSensitive)
.filter(filterExpression())
@@ -714,4 +713,12 @@ public class SparkScanBuilder
public StructType readSchema() {
return build().readSchema();
}
+
+ private BatchScan newBatchScan() {
+ if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
+ return new SparkDistributedDataScan(spark, table, readConf);
+ } else {
+ return table.newBatchScan();
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
new file mode 100644
index 0000000000..47b8dbb1d9
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public abstract class SparkDistributedDataScanTestBase
+ extends DataTableScanTestBase<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
+
+ @Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {1, LOCAL, LOCAL},
+ new Object[] {1, LOCAL, DISTRIBUTED},
+ new Object[] {1, DISTRIBUTED, LOCAL},
+ new Object[] {1, DISTRIBUTED, DISTRIBUTED},
+ new Object[] {2, LOCAL, LOCAL},
+ new Object[] {2, LOCAL, DISTRIBUTED},
+ new Object[] {2, DISTRIBUTED, LOCAL},
+ new Object[] {2, DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ protected static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public SparkDistributedDataScanTestBase(
+ int formatVersion, PlanningMode dataPlanningMode, PlanningMode
deletePlanningMode) {
+ super(formatVersion);
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @Before
+ public void configurePlanningModes() {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ }
+
+ @Override
+ protected BatchScan useRef(BatchScan scan, String ref) {
+ return scan.useRef(ref);
+ }
+
+ @Override
+ protected BatchScan useSnapshot(BatchScan scan, long snapshotId) {
+ return scan.useSnapshot(snapshotId);
+ }
+
+ @Override
+ protected BatchScan asOfTime(BatchScan scan, long timestampMillis) {
+ return scan.asOfTime(timestampMillis);
+ }
+
+ @Override
+ protected BatchScan newScan() {
+ SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+
+ protected static SparkSession initSpark(String serializer) {
+ return SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer", serializer)
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
new file mode 100644
index 0000000000..8ed37db642
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanDeletes
+ extends DeleteFileIndexTestBase<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
+
+ @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {LOCAL, LOCAL},
+ new Object[] {LOCAL, DISTRIBUTED},
+ new Object[] {DISTRIBUTED, LOCAL},
+ new Object[] {DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ private static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public TestSparkDistributedDataScanDeletes(
+ PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @Before
+ public void configurePlanningModes() {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ }
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSparkDistributedDataScanDeletes.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkDistributedDataScanDeletes.spark;
+ TestSparkDistributedDataScanDeletes.spark = null;
+ currentSpark.stop();
+ }
+
+ @Override
+ protected BatchScan newScan(Table table) {
+ SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
new file mode 100644
index 0000000000..510c130a58
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanFilterFiles
+ extends FilterFilesTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {
+
+ @Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {1, LOCAL, LOCAL},
+ new Object[] {1, LOCAL, DISTRIBUTED},
+ new Object[] {1, DISTRIBUTED, LOCAL},
+ new Object[] {1, DISTRIBUTED, DISTRIBUTED},
+ new Object[] {2, LOCAL, LOCAL},
+ new Object[] {2, LOCAL, DISTRIBUTED},
+ new Object[] {2, DISTRIBUTED, LOCAL},
+ new Object[] {2, DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ private static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public TestSparkDistributedDataScanFilterFiles(
+ int formatVersion, PlanningMode dataPlanningMode, PlanningMode
deletePlanningMode) {
+ super(formatVersion);
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSparkDistributedDataScanFilterFiles.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkDistributedDataScanFilterFiles.spark;
+ TestSparkDistributedDataScanFilterFiles.spark = null;
+ currentSpark.stop();
+ }
+
+ @Override
+ protected BatchScan newScan(Table table) {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to
spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
index 102f48ee19..ba1096ee36 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
@@ -16,22 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
-public class ScanMetricsUtil {
+public class TestSparkDistributedDataScanJavaSerialization
+ extends SparkDistributedDataScanTestBase {
- private ScanMetricsUtil() {}
+ public TestSparkDistributedDataScanJavaSerialization(
+ int formatVersion, PlanningMode dataPlanningMode, PlanningMode
deletePlanningMode) {
+ super(formatVersion, dataPlanningMode, deletePlanningMode);
+ }
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
+ @BeforeClass
+ public static void startSpark() {
+ SparkDistributedDataScanTestBase.spark =
+ initSpark("org.apache.spark.serializer.JavaSerializer");
+ }
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
- }
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
+ SparkDistributedDataScanTestBase.spark = null;
+ currentSpark.stop();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to
spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
index 102f48ee19..7a795eb477 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
@@ -16,22 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
-public class ScanMetricsUtil {
+public class TestSparkDistributedDataScanKryoSerialization
+ extends SparkDistributedDataScanTestBase {
- private ScanMetricsUtil() {}
+ public TestSparkDistributedDataScanKryoSerialization(
+ int formatVersion, PlanningMode dataPlanningMode, PlanningMode
deletePlanningMode) {
+ super(formatVersion, dataPlanningMode, deletePlanningMode);
+ }
- public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
- metrics.indexedDeleteFiles().increment();
+ @BeforeClass
+ public static void startSpark() {
+ SparkDistributedDataScanTestBase.spark =
+ initSpark("org.apache.spark.serializer.KryoSerializer");
+ }
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- metrics.positionalDeleteFiles().increment();
- } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
- metrics.equalityDeleteFiles().increment();
- }
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
+ SparkDistributedDataScanTestBase.spark = null;
+ currentSpark.stop();
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
index 2537ff5f80..6581598945 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -111,4 +113,18 @@ public abstract class SparkTestBaseWithCatalog extends
SparkTestBase {
return PropertyUtil.propertyAsBoolean(
catalogConfig, CatalogProperties.CACHE_ENABLED,
CatalogProperties.CACHE_ENABLED_DEFAULT);
}
+
+ protected void configurePlanningMode(PlanningMode planningMode) {
+ configurePlanningMode(tableName, planningMode);
+ }
+
+ protected void configurePlanningMode(String table, PlanningMode
planningMode) {
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')",
+ table,
+ TableProperties.DATA_PLANNING_MODE,
+ planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE,
+ planningMode.modeName());
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 22db454a6e..e8af5e51ec 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark.source;
import static org.apache.iceberg.Files.localOutput;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
import static
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.column;
@@ -37,8 +39,10 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -151,21 +155,23 @@ public class TestFilteredScan {
private final String format;
private final boolean vectorized;
+ private final PlanningMode planningMode;
- @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+ @Parameterized.Parameters(name = "format = {0}, vectorized = {1},
planningMode = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {"parquet", false},
- {"parquet", true},
- {"avro", false},
- {"orc", false},
- {"orc", true}
+ {"parquet", false, LOCAL},
+ {"parquet", true, DISTRIBUTED},
+ {"avro", false, LOCAL},
+ {"orc", false, DISTRIBUTED},
+ {"orc", true, LOCAL}
};
}
- public TestFilteredScan(String format, boolean vectorized) {
+ public TestFilteredScan(String format, boolean vectorized, PlanningMode
planningMode) {
this.format = format;
this.vectorized = vectorized;
+ this.planningMode = planningMode;
}
private File parent = null;
@@ -179,7 +185,16 @@ public class TestFilteredScan {
File dataFolder = new File(unpartitioned, "data");
Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs());
- Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
unpartitioned.toString());
+ Table table =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DATA_PLANNING_MODE,
+ planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE,
+ planningMode.modeName()),
+ unpartitioned.toString());
Schema tableSchema = table.schema(); // use the table schema because ids
are reassigned
FileFormat fileFormat = FileFormat.fromString(format);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
index 7313c18cc0..45a523917f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
@@ -18,12 +18,16 @@
*/
package org.apache.iceberg.spark.source;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -51,23 +55,29 @@ public class TestIdentityPartitionData extends
SparkTestBase {
private static final Configuration CONF = new Configuration();
private static final HadoopTables TABLES = new HadoopTables(CONF);
- @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+ @Parameterized.Parameters(name = "format = {0}, vectorized = {1},
planningMode = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {"parquet", false},
- {"parquet", true},
- {"avro", false},
- {"orc", false},
- {"orc", true},
+ {"parquet", false, LOCAL},
+ {"parquet", true, DISTRIBUTED},
+ {"avro", false, LOCAL},
+ {"orc", false, DISTRIBUTED},
+ {"orc", true, LOCAL},
};
}
private final String format;
private final boolean vectorized;
+ private final Map<String, String> properties;
- public TestIdentityPartitionData(String format, boolean vectorized) {
+ public TestIdentityPartitionData(String format, boolean vectorized,
PlanningMode planningMode) {
this.format = format;
this.vectorized = vectorized;
+ this.properties =
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT, format,
+ TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
}
private static final Schema LOG_SCHEMA =
@@ -108,7 +118,6 @@ public class TestIdentityPartitionData extends
SparkTestBase {
String hiveTable = "hivetable";
Assert.assertTrue("Temp folder should exist", location.exists());
- Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.logs =
spark.createDataFrame(LOGS, LogMessage.class).select("id", "date",
"level", "message");
spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
@@ -138,7 +147,6 @@ public class TestIdentityPartitionData extends
SparkTestBase {
File location = temp.newFolder("logs");
Assert.assertTrue("Temp folder should exist", location.exists());
- Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.table = TABLES.create(LOG_SCHEMA, spec, properties,
location.toString());
this.logs =
spark.createDataFrame(LOGS, LogMessage.class).select("id", "date",
"level", "message");
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
index 4ef022c50c..c00549c68f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.spark.source;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -37,6 +40,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -74,23 +78,25 @@ public class TestPartitionPruning {
private static final Configuration CONF = new Configuration();
private static final HadoopTables TABLES = new HadoopTables(CONF);
- @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+ @Parameterized.Parameters(name = "format = {0}, vectorized = {1},
planningMode = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {"parquet", false},
- {"parquet", true},
- {"avro", false},
- {"orc", false},
- {"orc", true}
+ {"parquet", false, DISTRIBUTED},
+ {"parquet", true, LOCAL},
+ {"avro", false, DISTRIBUTED},
+ {"orc", false, LOCAL},
+ {"orc", true, DISTRIBUTED}
};
}
private final String format;
private final boolean vectorized;
+ private final PlanningMode planningMode;
- public TestPartitionPruning(String format, boolean vectorized) {
+ public TestPartitionPruning(String format, boolean vectorized, PlanningMode
planningMode) {
this.format = format;
this.vectorized = vectorized;
+ this.planningMode = planningMode;
}
private static SparkSession spark = null;
@@ -293,7 +299,11 @@ public class TestPartitionPruning {
private Table createTable(File originTableLocation) {
String trackedTableLocation =
CountOpenLocalFileSystem.convertPath(originTableLocation);
- Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT, format,
+ TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
return TABLES.create(LOG_SCHEMA, spec, properties, trackedTableLocation);
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
index beaf7b75c6..edd4cdf083 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.source;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;
@@ -27,6 +29,7 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -41,9 +44,23 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestRuntimeFiltering extends SparkTestBaseWithCatalog {
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
+ private final PlanningMode planningMode;
+
+ public TestRuntimeFiltering(PlanningMode planningMode) {
+ this.planningMode = planningMode;
+ }
+
@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
@@ -57,6 +74,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (date)",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -95,6 +113,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (bucket(8, id))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -133,6 +152,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (bucket(8, id))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -173,6 +193,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (data, bucket(8, id))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -215,6 +236,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (data, bucket(8, id))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -256,6 +278,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP)
USING iceberg",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df1 =
spark
@@ -309,6 +332,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (bucket(8, `i.d`))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -352,6 +376,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (bucket(8, `i``d`))",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
@@ -390,6 +415,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP)
USING iceberg",
tableName);
+ configurePlanningMode(planningMode);
Dataset<Row> df =
spark
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 276fbcd592..f1374c050d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -18,16 +18,22 @@
*/
package org.apache.iceberg.spark.source;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.iceberg.types.Types.NestedField.optional;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -43,9 +49,17 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestSnapshotSelection {
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
new Schema(
@@ -55,6 +69,15 @@ public class TestSnapshotSelection {
private static SparkSession spark = null;
+ private final Map<String, String> properties;
+
+ public TestSnapshotSelection(PlanningMode planningMode) {
+ this.properties =
+ ImmutableMap.of(
+ TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
+ }
+
@BeforeClass
public static void startSpark() {
TestSnapshotSelection.spark =
SparkSession.builder().master("local[2]").getOrCreate();
@@ -73,7 +96,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -118,7 +141,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -168,7 +191,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- tables.create(SCHEMA, spec, tableLocation);
+ tables.create(SCHEMA, spec, properties, tableLocation);
Dataset<Row> df = spark.read().format("iceberg").option("snapshot-id",
-10).load(tableLocation);
@@ -184,7 +207,7 @@ public class TestSnapshotSelection {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- tables.create(SCHEMA, spec, tableLocation);
+ tables.create(SCHEMA, spec, properties, tableLocation);
Assertions.assertThatThrownBy(
() ->
@@ -203,7 +226,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
@@ -235,7 +258,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -270,7 +293,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -305,7 +328,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -336,7 +359,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
@@ -379,7 +402,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
@@ -420,7 +443,7 @@ public class TestSnapshotSelection {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
- Table table = tables.create(SCHEMA, spec, tableLocation);
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
index dde1eb7b36..3a4b235c46 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark.source;
import static org.apache.iceberg.Files.localOutput;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -31,8 +33,10 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
@@ -58,24 +62,26 @@ public class TestSparkReadProjection extends
TestReadProjection {
private static SparkSession spark = null;
- @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+ @Parameterized.Parameters(name = "format = {0}, vectorized = {1},
planningMode = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {"parquet", false},
- {"parquet", true},
- {"avro", false},
- {"orc", false},
- {"orc", true}
+ {"parquet", false, LOCAL},
+ {"parquet", true, DISTRIBUTED},
+ {"avro", false, LOCAL},
+ {"orc", false, DISTRIBUTED},
+ {"orc", true, LOCAL}
};
}
private final FileFormat format;
private final boolean vectorized;
+ private final PlanningMode planningMode;
- public TestSparkReadProjection(String format, boolean vectorized) {
+ public TestSparkReadProjection(String format, boolean vectorized,
PlanningMode planningMode) {
super(format);
this.format = FileFormat.fromString(format);
this.vectorized = vectorized;
+ this.planningMode = planningMode;
}
@BeforeClass
@@ -111,7 +117,15 @@ public class TestSparkReadProjection extends
TestReadProjection {
File testFile = new File(dataFolder,
format.addExtension(UUID.randomUUID().toString()));
- Table table = TestTables.create(location, desc, writeSchema,
PartitionSpec.unpartitioned());
+ Table table =
+ TestTables.create(
+ location,
+ desc,
+ writeSchema,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE,
planningMode.modeName()));
try {
// Important: use the table's schema for the rest of the test
// When tables are created, the column ids are reassigned.
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 0ea34e187f..e7401a00e8 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -18,9 +18,13 @@
*/
package org.apache.iceberg.spark.sql;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -29,9 +33,23 @@ import org.apache.spark.sql.execution.SparkPlan;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestFilterPushDown extends SparkTestBaseWithCatalog {
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
+ private final PlanningMode planningMode;
+
+ public TestFilterPushDown(PlanningMode planningMode) {
+ this.planningMode = planningMode;
+ }
+
@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
@@ -45,6 +63,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -156,6 +175,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (hours(t))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
@@ -201,6 +221,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (days(t))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-15T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
@@ -243,6 +264,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (months(t))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
@@ -285,6 +307,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (years(t))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
@@ -327,6 +350,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, id))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -345,6 +369,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (truncate(1, dep))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -369,6 +394,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, 'd1', 'sd1')", tableName);
@@ -409,6 +435,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (truncate(2, dep))",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
@@ -449,6 +476,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
+ "USING iceberg "
+ "PARTITIONED BY (hours(t))",
tableName);
+ configurePlanningMode(planningMode);
withDefaultTimeZone(
"UTC",
@@ -483,6 +511,7 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
sql(
"CREATE TABLE %s (id INT, salary DOUBLE)" + "USING iceberg " +
"PARTITIONED BY (salary)",
tableName);
+ configurePlanningMode(planningMode);
sql("INSERT INTO %s VALUES (1, 100.5)", tableName);
sql("INSERT INTO %s VALUES (2, double('NaN'))", tableName);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
index ebb190b698..8a1ec5060f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
@@ -18,10 +18,14 @@
*/
package org.apache.iceberg.spark.sql;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -44,9 +48,17 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
private static final String OTHER_TABLE_NAME = "other_table";
// open file cost and split size are set as 16 MB to produce a split per file
@@ -84,6 +96,12 @@ public class TestStoragePartitionedJoins extends
SparkTestBaseWithCatalog {
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");
+ private final PlanningMode planningMode;
+
+ public TestStoragePartitionedJoins(PlanningMode planningMode) {
+ this.planningMode = planningMode;
+ }
+
@BeforeClass
public static void setupSparkConf() {
spark.conf().set("spark.sql.shuffle.partitions", "4");
@@ -564,6 +582,7 @@ public class TestStoragePartitionedJoins extends
SparkTestBaseWithCatalog {
sourceColumnType,
transform,
tablePropsAsString(TABLE_PROPERTIES));
+ configurePlanningMode(tableName, planningMode);
sql(
createTableStmt,
@@ -572,6 +591,7 @@ public class TestStoragePartitionedJoins extends
SparkTestBaseWithCatalog {
sourceColumnType,
transform,
tablePropsAsString(TABLE_PROPERTIES));
+ configurePlanningMode(tableName(OTHER_TABLE_NAME), planningMode);
Table table = validationCatalog.loadTable(tableIdent);
Dataset<Row> dataDF = randomDataDF(table.schema(), 200);