This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fceea89cb4 Spark 3.5: Don't cache or reuse manifest entries while
rewriting metadata by default (#8935)
fceea89cb4 is described below
commit fceea89cb4a8f781641aa65456801b1cd40d0d03
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Oct 30 10:53:23 2023 -0700
Spark 3.5: Don't cache or reuse manifest entries while rewriting metadata
by default (#8935)
The action for rewriting manifests caches the manifest entry DF or does an
extra shuffle in order to skip reading the actual manifest files twice. We did
this assuming it would increase the performance. However, the caching seems to
perform poorly for larger tables as it requires substantial cluster resources.
In addition, doing a round-robin repartition is expensive as the entries must
be written to disk. The extra write is actually more expensive than the extra
read required for the [...]
Therefore, this change disables caching by default and removes the optional
round-robin repartition step. Instead, we will read the manifests twice (this
step is distributed and scales really well even for tables with huge metadata).
The new approach should be both faster and more robust.
---
.../spark/actions/RewriteManifestsSparkAction.java | 13 ++------
.../spark/actions/TestRewriteManifestsAction.java | 38 +++++++++++++++++-----
2 files changed, 32 insertions(+), 19 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 06a5c8c572..c0b6d3fe17 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -56,7 +56,6 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
@@ -65,7 +64,6 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction>
implements RewriteManifests {
public static final String USE_CACHING = "use-caching";
- public static final boolean USE_CACHING_DEFAULT = true;
+ public static final boolean USE_CACHING_DEFAULT = false;
private static final Logger LOG =
LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
@@ -277,16 +275,9 @@ public class RewriteManifestsSparkAction
}
private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func)
{
- Dataset<T> reusableDS;
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING,
USE_CACHING_DEFAULT);
- if (useCaching) {
- reusableDS = ds.cache();
- } else {
- int parallelism = SQLConf.get().numShufflePartitions();
- reusableDS =
- ds.repartition(parallelism).map((MapFunction<T, T>) value -> value,
ds.exprEnc());
- }
+ Dataset<T> reusableDS = useCaching ? ds.cache() : ds;
try {
return func.apply(reusableDS);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 64dbf42d4c..a2a8ec8a9f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));
- @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
+ @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0},
useCaching = {1}")
public static Object[] parameters() {
- return new Object[] {"true", "false"};
+ return new Object[][] {
+ new Object[] {"true", "true"},
+ new Object[] {"false", "true"},
+ new Object[] {"true", "false"},
+ new Object[] {"false", "false"}
+ };
}
@Rule public TemporaryFolder temp = new TemporaryFolder();
private final String snapshotIdInheritanceEnabled;
+ private final String useCaching;
private String tableLocation = null;
- public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
+ public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled,
String useCaching) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
+ this.useCaching = useCaching;
}
@Before
@@ -109,6 +116,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -141,7 +149,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
RewriteManifests.Result result =
- actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
@@ -281,7 +293,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.commit();
RewriteManifests.Result result =
- actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 4 manifests", 4,
Iterables.size(result.rewrittenManifests()));
@@ -354,6 +370,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -404,6 +421,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();
@@ -451,7 +469,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
- // rewrite only the first manifest without caching
+ // rewrite only the first manifest
RewriteManifests.Result result =
actions
.rewriteManifests(table)
@@ -460,7 +478,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
- .option("use-caching", "false")
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
Assert.assertEquals(
@@ -519,7 +537,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
- RewriteManifests.Result result = actions.rewriteManifests(table).execute();
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(