This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c171c57033 Spark 3.2: Don't cache or reuse manifest entries while
rewriting metadata by default (#8956)
c171c57033 is described below
commit c171c5703389a551065f6aa86aa4a6648c9ebcee
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Oct 30 17:34:48 2023 -0700
Spark 3.2: Don't cache or reuse manifest entries while rewriting metadata
by default (#8956)
This change cherry-picks PR #8935 to Spark 3.2.
---
.../spark/actions/RewriteManifestsSparkAction.java | 13 ++------
.../spark/actions/TestRewriteManifestsAction.java | 38 +++++++++++++++++-----
2 files changed, 32 insertions(+), 19 deletions(-)
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 2b525ccc0b..c5fdc11e34 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -56,7 +56,6 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
@@ -65,7 +64,6 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction>
implements RewriteManifests {
public static final String USE_CACHING = "use-caching";
- public static final boolean USE_CACHING_DEFAULT = true;
+ public static final boolean USE_CACHING_DEFAULT = false;
private static final Logger LOG =
LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
@@ -277,16 +275,9 @@ public class RewriteManifestsSparkAction
}
private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func)
{
- Dataset<T> reusableDS;
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING,
USE_CACHING_DEFAULT);
- if (useCaching) {
- reusableDS = ds.cache();
- } else {
- int parallelism = SQLConf.get().numShufflePartitions();
- reusableDS =
- ds.repartition(parallelism).map((MapFunction<T, T>) value -> value,
ds.exprEnc());
- }
+ Dataset<T> reusableDS = useCaching ? ds.cache() : ds;
try {
return func.apply(reusableDS);
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4aafb72ace..5cc7941236 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));
- @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
+ @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0},
useCaching = {1}")
public static Object[] parameters() {
- return new Object[] {"true", "false"};
+ return new Object[][] {
+ new Object[] {"true", "true"},
+ new Object[] {"false", "true"},
+ new Object[] {"true", "false"},
+ new Object[] {"false", "false"}
+ };
}
@Rule public TemporaryFolder temp = new TemporaryFolder();
private final String snapshotIdInheritanceEnabled;
+ private final String useCaching;
private String tableLocation = null;
- public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
+ public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled,
String useCaching) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
+ this.useCaching = useCaching;
}
@Before
@@ -109,6 +116,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -141,7 +149,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
RewriteManifests.Result result =
- actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
@@ -281,7 +293,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.commit();
RewriteManifests.Result result =
- actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 4 manifests", 4,
Iterables.size(result.rewrittenManifests()));
@@ -354,6 +370,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -404,6 +421,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -451,7 +469,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
- // rewrite only the first manifest without caching
+ // rewrite only the first manifest
RewriteManifests.Result result =
actions
.rewriteManifests(table)
@@ -460,7 +478,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
- .option("use-caching", "false")
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
Assert.assertEquals(
@@ -519,7 +537,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
- RewriteManifests.Result result = actions.rewriteManifests(table).execute();
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(