This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d8f26caa97 Spark: Use bulk deletes in rewrite manifests action (#10343)
d8f26caa97 is described below
commit d8f26caa97b5c06ee3a428b5c0868762a6f553a3
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Jun 17 07:34:23 2024 -0700
Spark: Use bulk deletes in rewrite manifests action (#10343)
---
.../spark/actions/RewriteManifestsSparkAction.java | 16 +++++++++-------
.../spark/actions/RewriteManifestsSparkAction.java | 16 +++++++++-------
.../spark/actions/RewriteManifestsSparkAction.java | 16 +++++++++-------
3 files changed, 27 insertions(+), 21 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index bc2ef23067..288b2d417f 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -54,7 +55,6 @@ import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -352,12 +352,14 @@ public class RewriteManifestsSparkAction
}
private void deleteFiles(Iterable<String> locations) {
- Tasks.foreach(locations)
- .executeWith(ThreadPools.getWorkerPool())
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
- .run(location -> table.io().deleteFile(location));
+ Iterable<FileInfo> files =
+ Iterables.transform(locations, location -> new FileInfo(location,
MANIFEST));
+ if (table.io() instanceof SupportsBulkOperations) {
+ deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+ } else {
+ deleteFiles(
+ ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file),
files.iterator());
+ }
}
private static ManifestFile writeManifest(
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 5b1d616569..8ec3b44f92 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -60,7 +61,6 @@ import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -362,12 +362,14 @@ public class RewriteManifestsSparkAction
}
private void deleteFiles(Iterable<String> locations) {
- Tasks.foreach(locations)
- .executeWith(ThreadPools.getWorkerPool())
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
- .run(location -> table.io().deleteFile(location));
+ Iterable<FileInfo> files =
+ Iterables.transform(locations, location -> new FileInfo(location,
MANIFEST));
+ if (table.io() instanceof SupportsBulkOperations) {
+ deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+ } else {
+ deleteFiles(
+ ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file),
files.iterator());
+ }
}
private ManifestWriterFactory manifestWriters() {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 5b1d616569..8ec3b44f92 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -60,7 +61,6 @@ import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -362,12 +362,14 @@ public class RewriteManifestsSparkAction
}
private void deleteFiles(Iterable<String> locations) {
- Tasks.foreach(locations)
- .executeWith(ThreadPools.getWorkerPool())
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
- .run(location -> table.io().deleteFile(location));
+ Iterable<FileInfo> files =
+ Iterables.transform(locations, location -> new FileInfo(location,
MANIFEST));
+ if (table.io() instanceof SupportsBulkOperations) {
+ deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+ } else {
+ deleteFiles(
+ ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file),
files.iterator());
+ }
}
private ManifestWriterFactory manifestWriters() {