This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 86bb1c09f5 Spark 3.4: Don't cache or reuse manifest entries while 
rewriting metadata by default (#8954)
86bb1c09f5 is described below

commit 86bb1c09f5ffd2b6a7c72683cb86bb95f4c2b72f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Oct 30 17:35:59 2023 -0700

    Spark 3.4: Don't cache or reuse manifest entries while rewriting metadata 
by default (#8954)
    
    This change cherry-picks PR #8935 to Spark 3.4.
---
 .../spark/actions/RewriteManifestsSparkAction.java | 13 ++------
 .../spark/actions/TestRewriteManifestsAction.java  | 38 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 19 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 06a5c8c572..c0b6d3fe17 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -56,7 +56,6 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Column;
@@ -65,7 +64,6 @@ import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
     extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> 
implements RewriteManifests {
 
   public static final String USE_CACHING = "use-caching";
-  public static final boolean USE_CACHING_DEFAULT = true;
+  public static final boolean USE_CACHING_DEFAULT = false;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
 
@@ -277,16 +275,9 @@ public class RewriteManifestsSparkAction
   }
 
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) 
{
-    Dataset<T> reusableDS;
     boolean useCaching =
         PropertyUtil.propertyAsBoolean(options(), USE_CACHING, 
USE_CACHING_DEFAULT);
-    if (useCaching) {
-      reusableDS = ds.cache();
-    } else {
-      int parallelism = SQLConf.get().numShufflePartitions();
-      reusableDS =
-          ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, 
ds.exprEnc());
-    }
+    Dataset<T> reusableDS = useCaching ? ds.cache() : ds;
 
     try {
       return func.apply(reusableDS);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 64dbf42d4c..a2a8ec8a9f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           optional(2, "c2", Types.StringType.get()),
           optional(3, "c3", Types.StringType.get()));
 
-  @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
+  @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, 
useCaching = {1}")
   public static Object[] parameters() {
-    return new Object[] {"true", "false"};
+    return new Object[][] {
+      new Object[] {"true", "true"},
+      new Object[] {"false", "true"},
+      new Object[] {"true", "false"},
+      new Object[] {"false", "false"}
+    };
   }
 
   @Rule public TemporaryFolder temp = new TemporaryFolder();
 
   private final String snapshotIdInheritanceEnabled;
+  private final String useCaching;
   private String tableLocation = null;
 
-  public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
+  public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, 
String useCaching) {
     this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
+    this.useCaching = useCaching;
   }
 
   @Before
@@ -109,6 +116,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     actions
         .rewriteManifests(table)
         .rewriteIf(manifest -> true)
+        .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
         .stagingLocation(temp.newFolder().toString())
         .execute();
 
@@ -141,7 +149,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     SparkActions actions = SparkActions.get();
 
     RewriteManifests.Result result =
-        actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> true)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
 
     Assert.assertEquals(
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
@@ -281,7 +293,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         .commit();
 
     RewriteManifests.Result result =
-        actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> true)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
 
     Assert.assertEquals(
         "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
@@ -354,6 +370,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           actions
               .rewriteManifests(table)
               .rewriteIf(manifest -> true)
+              .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
               .stagingLocation(temp.newFolder().toString())
               .execute();
 
@@ -404,6 +421,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         actions
             .rewriteManifests(table)
             .rewriteIf(manifest -> true)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .stagingLocation(temp.newFolder().toString())
             .execute();
 
@@ -451,7 +469,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
-    // rewrite only the first manifest without caching
+    // rewrite only the first manifest
     RewriteManifests.Result result =
         actions
             .rewriteManifests(table)
@@ -460,7 +478,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
                     (manifest.path().equals(manifests.get(0).path())
                         || (manifest.path().equals(manifests.get(1).path()))))
             .stagingLocation(temp.newFolder().toString())
-            .option("use-caching", "false")
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
     Assert.assertEquals(
@@ -519,7 +537,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
 
     SparkActions actions = SparkActions.get();
-    RewriteManifests.Result result = actions.rewriteManifests(table).execute();
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
     Assert.assertEquals(
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(

Reply via email to