This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.14.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 7feb9457bd095b1d5e3d6224db4780cc5b833a36 Author: Ryan Blue <[email protected]> AuthorDate: Thu Sep 1 23:20:57 2022 -0700 Spark: Fix stats in rewrite metadata action (#5691) * Core: Don't show dropped fields from the partition spec * Use projection instead * Use StructProjection in SparkDataFile. Co-authored-by: Fokko Driesprong <[email protected]> --- .../extensions/TestRewriteManifestsProcedure.java | 29 ++++++ .../org/apache/iceberg/spark/SparkDataFile.java | 21 ++++- .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++----- .../extensions/TestRewriteManifestsProcedure.java | 29 ++++++ .../org/apache/iceberg/spark/SparkDataFile.java | 21 ++++- .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++----- 6 files changed, 262 insertions(+), 48 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index dcf0a2d91e..0d10cb0d7d 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; +import java.sql.Date; +import java.sql.Timestamp; import java.util.List; import java.util.Map; import org.apache.iceberg.AssertHelpers; @@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase { IllegalArgumentException.class, "Cannot handle an empty identifier", () -> sql("CALL %s.system.rewrite_manifests('')", catalogName)); } + + @Test + public void testReplacePartitionField() { + sql( + "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)", + tableName); + + sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName); + sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName); + sql( + "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))", + tableName); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))), + sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); + + sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))), + sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c5..5fe0cd86a4 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile { private final Type keyMetadataType; private final SparkStructLike wrappedPartition; + private final StructLike partitionProjection; private Row wrapped; public SparkDataFile(Types.StructType type, StructType sparkType) { + this(type, null, sparkType); + } + + public SparkDataFile( + Types.StructType type, Types.StructType projectedType, StructType sparkType) { this.lowerBoundsType = type.fieldType("lower_bounds"); this.upperBoundsType = type.fieldType("upper_bounds"); this.keyMetadataType = type.fieldType("key_metadata"); - this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType()); + + Types.StructType partitionType = type.fieldType("partition").asStructType(); + this.wrappedPartition = new SparkStructLike(partitionType); + + if (projectedType != null) { + Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType(); + this.partitionProjection = + StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition); + } else { + this.partitionProjection = wrappedPartition; + } Map<String, Integer> positions = Maps.newHashMap(); type.fields().forEach(field -> { @@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile { @Override public StructLike partition() { - return wrappedPartition; + return partitionProjection; } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 99e51a37aa..030532fa94 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) { Broadcast<FileIO> io = sparkContext().broadcast(fileIO); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); + Types.StructType combinedPartitionType = Partitioning.partitionType(table); // we rely only on the target number of manifests for unpartitioned tables // as we should not worry about having too much metadata per partition @@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction return manifestEntryDF .repartition(numManifests) .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), - manifestEncoder - ) + toManifests( + io, + maxNumManifestEntries, + stagingLocation, + formatVersion, + combinedPartitionType, + spec, + sparkType), + manifestEncoder) .collectAsList(); } @@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction Broadcast<FileIO> io = sparkContext().broadcast(fileIO); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); + Types.StructType combinedPartitionType = Partitioning.partitionType(table); // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries); - return withReusableDS(manifestEntryDF, df -> { - Column partitionColumn = df.col("data_file.partition"); - return df.repartitionByRange(numManifests, partitionColumn) - .sortWithinPartitions(partitionColumn) - .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), - manifestEncoder - ) - .collectAsList(); - }); + return withReusableDS( + manifestEntryDF, + df -> { + Column partitionColumn = df.col("data_file.partition"); + return df.repartitionByRange(numManifests, partitionColumn) + .sortWithinPartitions(partitionColumn) + .mapPartitions( + toManifests( + io, + maxNumManifestEntries, + stagingLocation, + formatVersion, + combinedPartitionType, + spec, + sparkType), + manifestEncoder) + .collectAsList(); + }); } private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { @@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction } private static ManifestFile writeManifest( - List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io, - String location, int format, PartitionSpec spec, StructType sparkType) throws IOException { + List<Row> rows, + int startIndex, + int endIndex, + Broadcast<FileIO> io, + String location, + int format, + Types.StructType combinedPartitionType, + PartitionSpec spec, + StructType sparkType) + throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); Path manifestPath = new Path(location, manifestName); OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString())); - Types.StructType dataFileType = DataFile.getType(spec.partitionType()); - SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType); + Types.StructType combinedFileType = DataFile.getType(combinedPartitionType); + Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); + SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null); @@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction } private static MapPartitionsFunction<Row, ManifestFile> toManifests( - Broadcast<FileIO> io, long maxNumManifestEntries, String location, - int format, PartitionSpec spec, StructType sparkType) { + Broadcast<FileIO> io, + long maxNumManifestEntries, + String location, + int format, + Types.StructType combinedPartitionType, + PartitionSpec spec, + StructType sparkType) { return rows -> { List<Row> rowsAsList = Lists.newArrayList(rows); @@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction List<ManifestFile> manifests = Lists.newArrayList(); if (rowsAsList.size() <= maxNumManifestEntries) { - manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add( + writeManifest( + rowsAsList, + 0, + rowsAsList.size(), + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); } else { int midIndex = rowsAsList.size() / 2; - manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType)); - manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add( + writeManifest( + rowsAsList, + 0, + midIndex, + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); + manifests.add( + writeManifest( + rowsAsList, + midIndex, + rowsAsList.size(), + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); } return manifests.iterator(); diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index dcf0a2d91e..0d10cb0d7d 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; +import java.sql.Date; +import java.sql.Timestamp; import java.util.List; import java.util.Map; import org.apache.iceberg.AssertHelpers; @@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase { IllegalArgumentException.class, "Cannot handle an empty identifier", () -> sql("CALL %s.system.rewrite_manifests('')", catalogName)); } + + @Test + public void testReplacePartitionField() { + sql( + "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)", + tableName); + + sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName); + sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName); + sql( + "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))", + tableName); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))), + sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); + + sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))), + sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c5..5fe0cd86a4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile { private final Type keyMetadataType; private final SparkStructLike wrappedPartition; + private final StructLike partitionProjection; private Row wrapped; public SparkDataFile(Types.StructType type, StructType sparkType) { + this(type, null, sparkType); + } + + public SparkDataFile( + Types.StructType type, Types.StructType projectedType, StructType sparkType) { this.lowerBoundsType = type.fieldType("lower_bounds"); this.upperBoundsType = type.fieldType("upper_bounds"); this.keyMetadataType = type.fieldType("key_metadata"); - this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType()); + + Types.StructType partitionType = type.fieldType("partition").asStructType(); + this.wrappedPartition = new SparkStructLike(partitionType); + + if (projectedType != null) { + Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType(); + this.partitionProjection = + StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition); + } else { + this.partitionProjection = wrappedPartition; + } Map<String, Integer> positions = Maps.newHashMap(); type.fields().forEach(field -> { @@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile { @Override public StructLike partition() { - return wrappedPartition; + return partitionProjection; } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 99e51a37aa..030532fa94 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) { Broadcast<FileIO> io = sparkContext().broadcast(fileIO); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); + Types.StructType combinedPartitionType = Partitioning.partitionType(table); // we rely only on the target number of manifests for unpartitioned tables // as we should not worry about having too much metadata per partition @@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction return manifestEntryDF .repartition(numManifests) .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), - manifestEncoder - ) + toManifests( + io, + maxNumManifestEntries, + stagingLocation, + formatVersion, + combinedPartitionType, + spec, + sparkType), + manifestEncoder) .collectAsList(); } @@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction Broadcast<FileIO> io = sparkContext().broadcast(fileIO); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); + Types.StructType combinedPartitionType = Partitioning.partitionType(table); // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries); - return withReusableDS(manifestEntryDF, df -> { - Column partitionColumn = df.col("data_file.partition"); - return df.repartitionByRange(numManifests, partitionColumn) - .sortWithinPartitions(partitionColumn) - .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), - manifestEncoder - ) - .collectAsList(); - }); + return withReusableDS( + manifestEntryDF, + df -> { + Column partitionColumn = df.col("data_file.partition"); + return df.repartitionByRange(numManifests, partitionColumn) + .sortWithinPartitions(partitionColumn) + .mapPartitions( + toManifests( + io, + maxNumManifestEntries, + stagingLocation, + formatVersion, + combinedPartitionType, + spec, + sparkType), + manifestEncoder) + .collectAsList(); + }); } private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { @@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction } private static ManifestFile writeManifest( - List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io, - String location, int format, PartitionSpec spec, StructType sparkType) throws IOException { + List<Row> rows, + int startIndex, + int endIndex, + Broadcast<FileIO> io, + String location, + int format, + Types.StructType combinedPartitionType, + PartitionSpec spec, + StructType sparkType) + throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); Path manifestPath = new Path(location, manifestName); OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString())); - Types.StructType dataFileType = DataFile.getType(spec.partitionType()); - SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType); + Types.StructType combinedFileType = DataFile.getType(combinedPartitionType); + Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); + SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null); @@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction } private static MapPartitionsFunction<Row, ManifestFile> toManifests( - Broadcast<FileIO> io, long maxNumManifestEntries, String location, - int format, PartitionSpec spec, StructType sparkType) { + Broadcast<FileIO> io, + long maxNumManifestEntries, + String location, + int format, + Types.StructType combinedPartitionType, + PartitionSpec spec, + StructType sparkType) { return rows -> { List<Row> rowsAsList = Lists.newArrayList(rows); @@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction List<ManifestFile> manifests = Lists.newArrayList(); if (rowsAsList.size() <= maxNumManifestEntries) { - manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add( + writeManifest( + rowsAsList, + 0, + rowsAsList.size(), + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); } else { int midIndex = rowsAsList.size() / 2; - manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType)); - manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add( + writeManifest( + rowsAsList, + 0, + midIndex, + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); + manifests.add( + writeManifest( + rowsAsList, + midIndex, + rowsAsList.size(), + io, + location, + format, + combinedPartitionType, + spec, + sparkType)); } return manifests.iterator();
