This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d8f26caa97 Spark: Use bulk deletes in rewrite manifests action (#10343)
d8f26caa97 is described below

commit d8f26caa97b5c06ee3a428b5c0868762a6f553a3
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Jun 17 07:34:23 2024 -0700

    Spark: Use bulk deletes in rewrite manifests action (#10343)
---
 .../spark/actions/RewriteManifestsSparkAction.java       | 16 +++++++++-------
 .../spark/actions/RewriteManifestsSparkAction.java       | 16 +++++++++-------
 .../spark/actions/RewriteManifestsSparkAction.java       | 16 +++++++++-------
 3 files changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index bc2ef23067..288b2d417f 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -54,7 +55,6 @@ import org.apache.iceberg.spark.SparkDataFile;
 import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -352,12 +352,14 @@ public class RewriteManifestsSparkAction
   }
 
   private void deleteFiles(Iterable<String> locations) {
-    Tasks.foreach(locations)
-        .executeWith(ThreadPools.getWorkerPool())
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
-        .run(location -> table.io().deleteFile(location));
+    Iterable<FileInfo> files =
+        Iterables.transform(locations, location -> new FileInfo(location, 
MANIFEST));
+    if (table.io() instanceof SupportsBulkOperations) {
+      deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+    } else {
+      deleteFiles(
+          ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), 
files.iterator());
+    }
   }
 
   private static ManifestFile writeManifest(
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 5b1d616569..8ec3b44f92 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -60,7 +61,6 @@ import org.apache.iceberg.spark.SparkDeleteFile;
 import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -362,12 +362,14 @@ public class RewriteManifestsSparkAction
   }
 
   private void deleteFiles(Iterable<String> locations) {
-    Tasks.foreach(locations)
-        .executeWith(ThreadPools.getWorkerPool())
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
-        .run(location -> table.io().deleteFile(location));
+    Iterable<FileInfo> files =
+        Iterables.transform(locations, location -> new FileInfo(location, 
MANIFEST));
+    if (table.io() instanceof SupportsBulkOperations) {
+      deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+    } else {
+      deleteFiles(
+          ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), 
files.iterator());
+    }
   }
 
   private ManifestWriterFactory manifestWriters() {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 5b1d616569..8ec3b44f92 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -60,7 +61,6 @@ import org.apache.iceberg.spark.SparkDeleteFile;
 import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -362,12 +362,14 @@ public class RewriteManifestsSparkAction
   }
 
   private void deleteFiles(Iterable<String> locations) {
-    Tasks.foreach(locations)
-        .executeWith(ThreadPools.getWorkerPool())
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
-        .run(location -> table.io().deleteFile(location));
+    Iterable<FileInfo> files =
+        Iterables.transform(locations, location -> new FileInfo(location, 
MANIFEST));
+    if (table.io() instanceof SupportsBulkOperations) {
+      deleteFiles((SupportsBulkOperations) table.io(), files.iterator());
+    } else {
+      deleteFiles(
+          ThreadPools.getWorkerPool(), file -> table.io().deleteFile(file), 
files.iterator());
+    }
   }
 
   private ManifestWriterFactory manifestWriters() {

Reply via email to