This is an automated email from the ASF dual-hosted git repository.
RussellSpitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f23da211bb Core, Parquet: Allow for Writing Parquet/Avro Manifests in
V4 (#15634)
f23da211bb is described below
commit f23da211bbe108a060df30c1f65684858aea0e8d
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri May 22 15:52:38 2026 -0500
Core, Parquet: Allow for Writing Parquet/Avro Manifests in V4 (#15634)
---
.../java/org/apache/iceberg/ManifestBenchmark.java | 171 ++++++--------------
.../org/apache/iceberg/ManifestBenchmarkUtil.java | 120 ++++++++++++++
...mark.java => ManifestCompressionBenchmark.java} | 111 ++-----------
.../org/apache/iceberg/ManifestReadBenchmark.java | 173 ---------------------
.../org/apache/iceberg/ManifestWriteBenchmark.java | 173 ---------------------
.../src/main/java/org/apache/iceberg/BaseFile.java | 41 ++++-
.../java/org/apache/iceberg/ManifestReader.java | 29 +++-
.../java/org/apache/iceberg/ManifestWriter.java | 62 ++++++--
.../java/org/apache/iceberg/SnapshotProducer.java | 7 +-
.../java/org/apache/iceberg/TableMetadata.java | 1 +
.../main/java/org/apache/iceberg/V4Metadata.java | 74 +++++----
.../src/test/java/org/apache/iceberg/TestBase.java | 23 ++-
.../java/org/apache/iceberg/TestFastAppend.java | 8 +-
.../org/apache/iceberg/TestManifestReader.java | 4 +
.../org/apache/iceberg/TestManifestWriter.java | 11 +-
.../apache/iceberg/TestManifestWriterVersions.java | 125 ++++++++++++++-
.../java/org/apache/iceberg/TestMergeAppend.java | 18 ++-
.../org/apache/iceberg/TestRewriteManifests.java | 70 +++++----
.../org/apache/iceberg/TestSnapshotProducer.java | 13 +-
.../java/org/apache/iceberg/TestTransaction.java | 6 +-
.../org/apache/iceberg/jdbc/TestJdbcCatalog.java | 2 +-
.../apache/iceberg/util/TestManifestFileUtil.java | 55 +++++--
.../iceberg/parquet/ParquetValueReaders.java | 18 ++-
.../spark/actions/TestRewriteTablePathsAction.java | 5 +-
.../spark/actions/TestRewriteTablePathsAction.java | 5 +-
.../spark/actions/TestRewriteTablePathsAction.java | 5 +-
26 files changed, 627 insertions(+), 703 deletions(-)
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
index cbd372b7a4..b1b3847c5a 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -18,23 +18,15 @@
*/
package org.apache.iceberg;
-import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.openjdk.jmh.annotations.AuxCounters;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -52,7 +44,12 @@ import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
/**
- * A benchmark that measures manifest read/write performance across
compression codecs.
+ * A benchmark that measures manifest read/write performance across format
versions and file
+ * formats.
+ *
+ * <p>V1-V3 only support Avro manifests. V4 supports both Avro and Parquet.
The {@code
+ * versionFormat} parameter encodes valid combinations as {@code
"<version>_<format>"} (e.g. {@code
+ * "4_PARQUET"}) so that only meaningful pairings are benchmarked.
*
* <p>Entry counts are calibrated per column count via {@link #ENTRY_BASE}.
Set to 300_000 for ~8 MB
* manifests (matching the default {@code commit.manifest.target-size-bytes})
or 15_000 for ~400 KB.
@@ -63,13 +60,25 @@ import org.openjdk.jmh.infra.Blackhole;
* # all combinations
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
*
- * # single codec
+ * # V4-only (Avro vs Parquet)
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=4_AVRO|4_PARQUET"
+ *
+ * # all versions, single column count
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
- * -PjmhParams="codec=gzip"
+ * -PjmhParams="numCols=50"
+ *
+ * # single version
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=3_AVRO"
* }</pre>
*/
@Fork(1)
@State(Scope.Benchmark)
+// Parquet's columnar write path has a deep call graph (per-column encoders,
page assembly,
+// dictionary management) that requires more warmup iterations than Avro for
the JIT compiler to
+// fully optimize. Profiling shows ~650ms of JIT compilation spread across the
first 3-4
+// iterations, so 6 warmups ensure measurement begins after JIT has stabilized.
@Warmup(iterations = 6)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.SingleShotTime)
@@ -78,19 +87,8 @@ public class ManifestBenchmark {
static final int ENTRY_BASE = 300_000;
- private static final int FORMAT_VERSION = 4;
-
- private static final Schema SCHEMA =
- new Schema(
- Types.NestedField.required(1, "id", Types.IntegerType.get()),
- Types.NestedField.required(2, "data", Types.StringType.get()),
- Types.NestedField.required(3, "customer", Types.StringType.get()));
-
- private static final PartitionSpec SPEC =
-
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
- @Param({"gzip", "snappy", "zstd", "uncompressed"})
- private String codec;
+ @Param({"1_AVRO", "2_AVRO", "3_AVRO", "4_AVRO", "4_PARQUET"})
+ private String versionFormat;
@Param({"true", "false"})
private String partitioned;
@@ -98,11 +96,11 @@ public class ManifestBenchmark {
@Param({"10", "50", "100"})
private int numCols;
+ private int formatVersion;
+ private FileFormat fileFormat;
private PartitionSpec spec;
private Map<Integer, PartitionSpec> specsById;
- private Map<String, String> writerProperties;
private List<DataFile> dataFiles;
- private int numEntries;
private String writeBaseDir;
private OutputFile writeOutputFile;
@@ -112,21 +110,26 @@ public class ManifestBenchmark {
@Setup(Level.Trial)
public void setupTrial() {
- this.spec = Boolean.parseBoolean(partitioned) ? SPEC :
PartitionSpec.unpartitioned();
- this.specsById = Map.of(spec.specId(), spec);
- this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
- // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 →
~400 KB manifests
- this.numEntries = ENTRY_BASE / numCols;
- this.dataFiles = generateDataFiles();
+ String[] parts = versionFormat.split("_", 2);
+ this.formatVersion = Integer.parseInt(parts[0]);
+ this.fileFormat = FileFormat.fromString(parts[1]);
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
+ this.specsById = ImmutableMap.of(spec.specId(), spec);
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE,
numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries,
numCols);
setupReadManifest();
}
@Setup(Level.Invocation)
public void setupWriteInvocation() throws IOException {
- this.writeBaseDir =
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeBaseDir =
+
java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
this.writeOutputFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", writeBaseDir,
fileFormat.addExtension("manifest")));
for (DataFile file : dataFiles) {
file.path();
@@ -137,7 +140,7 @@ public class ManifestBenchmark {
@TearDown(Level.Trial)
public void tearDownTrial() {
- cleanDir(readBaseDir);
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
readBaseDir = null;
readManifest = null;
dataFiles = null;
@@ -145,28 +148,15 @@ public class ManifestBenchmark {
@TearDown(Level.Invocation)
public void tearDownInvocation() {
- cleanDir(writeBaseDir);
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
writeBaseDir = null;
writeOutputFile = null;
}
- @AuxCounters(AuxCounters.Type.EVENTS)
- @State(Scope.Thread)
- @SuppressWarnings("checkstyle:VisibilityModifier")
- public static class FileSizeCounters {
- public double manifestSizeMB;
-
- @Setup(Level.Invocation)
- public void reset() {
- manifestSizeMB = 0;
- }
- }
-
@Benchmark
@Threads(1)
- public ManifestFile writeManifest(FileSizeCounters counters) throws
IOException {
- ManifestWriter<DataFile> writer =
- ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L,
writerProperties);
+ public ManifestFile writeManifest() throws IOException {
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, spec,
writeOutputFile, 1L);
try (ManifestWriter<DataFile> w = writer) {
for (DataFile file : dataFiles) {
@@ -174,9 +164,7 @@ public class ManifestBenchmark {
}
}
- ManifestFile manifest = writer.toManifestFile();
- counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
- return manifest;
+ return writer.toManifestFile();
}
@Benchmark
@@ -193,17 +181,17 @@ public class ManifestBenchmark {
private void setupReadManifest() {
try {
- this.readBaseDir =
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ this.readBaseDir =
+
java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", readBaseDir,
fileFormat.addExtension("manifest")));
- ManifestWriter<DataFile> writer =
- ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L,
writerProperties);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, spec,
manifestFile, 1L);
try (ManifestWriter<DataFile> w = writer) {
for (DataFile file : dataFiles) {
@@ -215,65 +203,4 @@ public class ManifestBenchmark {
this.readManifest = writer.toManifestFile();
}
-
- private List<DataFile> generateDataFiles() {
- Random random = new Random(42);
- List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
- for (int i = 0; i < numEntries; i++) {
- DataFiles.Builder builder =
- DataFiles.builder(spec)
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet",
i))
- .withFileSizeInBytes(1024 + i)
- .withRecordCount(1000 + i)
- .withMetrics(randomMetrics(random, numCols));
-
- if (!spec.isUnpartitioned()) {
- builder.withPartitionPath(
- String.format(
- Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i
% 50, i % 200));
- }
-
- files.add(builder.build());
- }
-
- return files;
- }
-
- static Metrics randomMetrics(Random random, int cols) {
- long rowCount = 100_000L + random.nextInt(1000);
- Map<Integer, Long> columnSizes = Maps.newHashMap();
- Map<Integer, Long> valueCounts = Maps.newHashMap();
- Map<Integer, Long> nullValueCounts = Maps.newHashMap();
- Map<Integer, Long> nanValueCounts = Maps.newHashMap();
- Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
- Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
- for (int i = 0; i < cols; i++) {
- columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
- valueCounts.put(i, 100_000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-
- private static void cleanDir(String dir) {
- if (dir != null) {
- FileUtils.deleteQuietly(new File(dir));
- }
- }
}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
new file mode 100644
index 0000000000..d37c48daab
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Shared constants and stateless helpers for {@link ManifestBenchmark} and
{@link
+ * ManifestCompressionBenchmark}.
+ */
+final class ManifestBenchmarkUtil {
+
+ static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+ static final PartitionSpec SPEC =
+
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+ private ManifestBenchmarkUtil() {}
+
+ /**
+ * Returns the number of manifest entries for the given column count. The
result is {@code
+ * entryBase / cols}.
+ *
+ * <p>The linear ratio was determined empirically by writing manifests at
various column counts
+ * and measuring the resulting file sizes. An {@code entryBase} of 300,000
produces ~8 MB
+ * manifests (matching the default {@code
commit.manifest.target-size-bytes}); 15,000 produces
+ * ~400 KB.
+ */
+ static int entriesForColumnCount(int entryBase, int cols) {
+ return entryBase / cols;
+ }
+
+ static List<DataFile> generateDataFiles(PartitionSpec spec, int numEntries,
int numCols) {
+ Random random = new Random(42);
+ List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
+ for (int i = 0; i < numEntries; i++) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet",
i))
+ .withFileSizeInBytes(1024 + i)
+ .withRecordCount(1000 + i)
+ .withMetrics(randomMetrics(random, numCols));
+
+ if (spec.isPartitioned()) {
+ builder.withPartitionPath(
+ String.format(
+ Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i
% 50, i % 200));
+ }
+
+ files.add(builder.build());
+ }
+ return files;
+ }
+
+ static Metrics randomMetrics(Random random, int cols) {
+ long rowCount = 100_000L + random.nextInt(1000);
+ Map<Integer, Long> columnSizes = Maps.newHashMap();
+ Map<Integer, Long> valueCounts = Maps.newHashMap();
+ Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+ Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+ Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+ Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+ for (int i = 0; i < cols; i++) {
+ columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+ valueCounts.put(i, 100_000L + random.nextInt(100));
+ nullValueCounts.put(i, (long) random.nextInt(5));
+ nanValueCounts.put(i, (long) random.nextInt(5));
+ byte[] lower = new byte[8];
+ random.nextBytes(lower);
+ lowerBounds.put(i, ByteBuffer.wrap(lower));
+ byte[] upper = new byte[8];
+ random.nextBytes(upper);
+ upperBounds.put(i, ByteBuffer.wrap(upper));
+ }
+
+ return new Metrics(
+ rowCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
+ static void cleanDir(String dir) {
+ if (dir != null) {
+ FileUtils.deleteQuietly(new java.io.File(dir));
+ }
+ }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
similarity index 58%
copy from core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
copy to core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
index cbd372b7a4..bf09ae18f9 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
@@ -18,22 +18,14 @@
*/
package org.apache.iceberg;
-import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -61,10 +53,10 @@ import org.openjdk.jmh.infra.Blackhole;
*
* <pre>{@code
* # all combinations
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark
*
* # single codec
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark \
* -PjmhParams="codec=gzip"
* }</pre>
*/
@@ -74,21 +66,12 @@ import org.openjdk.jmh.infra.Blackhole;
@Measurement(iterations = 10)
@BenchmarkMode(Mode.SingleShotTime)
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
-public class ManifestBenchmark {
+public class ManifestCompressionBenchmark {
static final int ENTRY_BASE = 300_000;
private static final int FORMAT_VERSION = 4;
- private static final Schema SCHEMA =
- new Schema(
- Types.NestedField.required(1, "id", Types.IntegerType.get()),
- Types.NestedField.required(2, "data", Types.StringType.get()),
- Types.NestedField.required(3, "customer", Types.StringType.get()));
-
- private static final PartitionSpec SPEC =
-
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
@Param({"gzip", "snappy", "zstd", "uncompressed"})
private String codec;
@@ -102,7 +85,6 @@ public class ManifestBenchmark {
private Map<Integer, PartitionSpec> specsById;
private Map<String, String> writerProperties;
private List<DataFile> dataFiles;
- private int numEntries;
private String writeBaseDir;
private OutputFile writeOutputFile;
@@ -112,21 +94,23 @@ public class ManifestBenchmark {
@Setup(Level.Trial)
public void setupTrial() {
- this.spec = Boolean.parseBoolean(partitioned) ? SPEC :
PartitionSpec.unpartitioned();
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
this.specsById = Map.of(spec.specId(), spec);
this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
- // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 →
~400 KB manifests
- this.numEntries = ENTRY_BASE / numCols;
- this.dataFiles = generateDataFiles();
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE,
numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries,
numCols);
setupReadManifest();
}
@Setup(Level.Invocation)
public void setupWriteInvocation() throws IOException {
- this.writeBaseDir =
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeBaseDir =
+
java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
this.writeOutputFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro",
writeBaseDir));
for (DataFile file : dataFiles) {
file.path();
@@ -137,7 +121,7 @@ public class ManifestBenchmark {
@TearDown(Level.Trial)
public void tearDownTrial() {
- cleanDir(readBaseDir);
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
readBaseDir = null;
readManifest = null;
dataFiles = null;
@@ -145,7 +129,7 @@ public class ManifestBenchmark {
@TearDown(Level.Invocation)
public void tearDownInvocation() {
- cleanDir(writeBaseDir);
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
writeBaseDir = null;
writeOutputFile = null;
}
@@ -193,14 +177,14 @@ public class ManifestBenchmark {
private void setupReadManifest() {
try {
- this.readBaseDir =
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ this.readBaseDir =
+
java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro",
readBaseDir));
ManifestWriter<DataFile> writer =
ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L,
writerProperties);
@@ -215,65 +199,4 @@ public class ManifestBenchmark {
this.readManifest = writer.toManifestFile();
}
-
- private List<DataFile> generateDataFiles() {
- Random random = new Random(42);
- List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
- for (int i = 0; i < numEntries; i++) {
- DataFiles.Builder builder =
- DataFiles.builder(spec)
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet",
i))
- .withFileSizeInBytes(1024 + i)
- .withRecordCount(1000 + i)
- .withMetrics(randomMetrics(random, numCols));
-
- if (!spec.isUnpartitioned()) {
- builder.withPartitionPath(
- String.format(
- Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i
% 50, i % 200));
- }
-
- files.add(builder.build());
- }
-
- return files;
- }
-
- static Metrics randomMetrics(Random random, int cols) {
- long rowCount = 100_000L + random.nextInt(1000);
- Map<Integer, Long> columnSizes = Maps.newHashMap();
- Map<Integer, Long> valueCounts = Maps.newHashMap();
- Map<Integer, Long> nullValueCounts = Maps.newHashMap();
- Map<Integer, Long> nanValueCounts = Maps.newHashMap();
- Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
- Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
- for (int i = 0; i < cols; i++) {
- columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
- valueCounts.put(i, 100_000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-
- private static void cleanDir(String dir) {
- if (dir != null) {
- FileUtils.deleteQuietly(new File(dir));
- }
- }
}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
deleted file mode 100644
index 588b5df1ba..0000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
-public class ManifestReadBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 10;
-
- private String baseDir;
- private String manifestListFile;
-
- @Setup
- public void before() {
- baseDir =
- Paths.get(new
File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
- manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
- Random random = new Random(System.currentTimeMillis());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- 1,
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter<DataFile> writer =
- ManifestFiles.write(1, PartitionSpec.unpartitioned(),
manifestFile, 1L);
- try (ManifestWriter<DataFile> finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i,
j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(randomMetrics(random))
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- @TearDown
- public void after() throws IOException {
- if (baseDir != null) {
- try (Stream<Path> walk = Files.walk(Paths.get(baseDir))) {
-
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
- }
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @Benchmark
- @Threads(1)
- public void readManifestFile() throws IOException {
- List<ManifestFile> manifests =
-
ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
- TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
- Map<Integer, PartitionSpec> specs =
- ImmutableMap.of(PartitionSpec.unpartitioned().specId(),
PartitionSpec.unpartitioned());
- for (ManifestFile manifestFile : manifests) {
- ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile,
fileIO, specs);
- try (CloseableIterator<DataFile> it = reader.iterator()) {
- while (it.hasNext()) {
- it.next().recordCount();
- }
- }
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map<Integer, Long> columnSizes = Maps.newHashMap();
- Map<Integer, Long> valueCounts = Maps.newHashMap();
- Map<Integer, Long> nullValueCounts = Maps.newHashMap();
- Map<Integer, Long> nanValueCounts = Maps.newHashMap();
- Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
- Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
deleted file mode 100644
index b0dab63dea..0000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-/**
- * A benchmark that evaluates the performance of writing manifest files
- *
- * <p>To run this benchmark: <code>
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestWriteBenchmark
- * </code>
- */
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
-public class ManifestWriteBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 100;
-
- private String baseDir;
- private String manifestListFile;
-
- private Metrics metrics;
-
- @Setup
- public void before() {
- Random random = new Random(System.currentTimeMillis());
- // Pre-create the metrics to avoid doing this in the benchmark itself
- metrics = randomMetrics(random);
- }
-
- @TearDown
- public void after() {
- if (baseDir != null) {
- FileUtils.deleteQuietly(new File(baseDir));
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @State(Scope.Benchmark)
- public static class BenchmarkState {
- @Param({"1", "2"})
- private int formatVersion;
-
- public int getFormatVersion() {
- return formatVersion;
- }
- }
-
- @Benchmark
- @Threads(1)
- public void writeManifestFile(BenchmarkState state) throws IOException {
- this.baseDir =
-
java.nio.file.Files.createTempDirectory("benchmark-").toAbsolutePath().toString();
- this.manifestListFile = String.format("%s/%s.avro", baseDir,
UUID.randomUUID());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- state.getFormatVersion(),
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter<DataFile> writer =
- ManifestFiles.write(
- state.formatVersion, PartitionSpec.unpartitioned(),
manifestFile, 1L);
- try (ManifestWriter<DataFile> finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i,
j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(metrics)
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map<Integer, Long> columnSizes = Maps.newHashMap();
- Map<Integer, Long> valueCounts = Maps.newHashMap();
- Map<Integer, Long> nullValueCounts = Maps.newHashMap();
- Map<Integer, Long> nanValueCounts = Maps.newHashMap();
- Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
- Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 3c31c50f09..7147ba5878 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -32,6 +32,7 @@ import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
@@ -329,7 +330,11 @@ abstract class BaseFile<F> extends SupportsIndexProjection
this.partitionSpecId = (value != null) ? (Integer) value : -1;
return;
case 4:
- this.partitionData = (PartitionData) value;
+ // Preserve the constructor-initialized partitionData when the reader
returns null
+ // (e.g., v4 Parquet manifests for unpartitioned tables omit the
partition field).
+ if (value != null) {
+ this.partitionData = (PartitionData) value;
+ }
return;
case 5:
this.recordCount = (Long) value;
@@ -581,9 +586,37 @@ abstract class BaseFile<F> extends SupportsIndexProjection
private static Map<Integer, ByteBuffer> copyByteBufferMap(
Map<Integer, ByteBuffer> map, Set<Integer> keys) {
- return SerializableByteBufferMap.wrap(copyMap(map, keys));
+ if (map == null) {
+ return null;
+ }
+
+ return SerializableByteBufferMap.wrap(deepCopyByteBufferMap(map, keys));
+ }
+
+ // Required as long as we have Map<Integer, ByteBuffer> in the API since
Parquet reuses buffers.
+ private static Map<Integer, ByteBuffer> deepCopyByteBufferMap(
+ Map<Integer, ByteBuffer> map, Set<Integer> keys) {
+ Map<Integer, ByteBuffer> deepCopy =
Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry<Integer, ByteBuffer> entry : map.entrySet()) {
+ if (keys == null || keys.contains(entry.getKey())) {
+ ByteBuffer buf = entry.getValue();
+ if (buf != null) {
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ copy.put(buf.duplicate());
+ copy.flip();
+ deepCopy.put(entry.getKey(), copy);
+ } else {
+ deepCopy.put(entry.getKey(), null);
+ }
+ }
+ }
+
+ return deepCopy;
}
+ // Returns an unmodifiable view of the map. The SerializableMap check is
needed because
+ // internal maps may be wrapped for serialization after being populated by a
format reader
+ // with container reuse enabled, and immutableMap() provides a stable
snapshot.
private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
if (map == null) {
return null;
@@ -594,6 +627,10 @@ abstract class BaseFile<F> extends SupportsIndexProjection
}
}
+ // Separate from toReadableMap because SerializableByteBufferMap is its own
wrapper type
+ // (not a SerializableMap subclass) to handle ByteBuffer-specific
serialization. ByteBuffer
+ // values are mutable and can be overwritten by Parquet container reuse, so
callers that
+ // retain references must use copyByteBufferMap to get independent copies.
private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer,
ByteBuffer> map) {
if (map == null) {
return null;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index e3c2325ab7..dc34836b6c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -60,6 +60,13 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+ private static final Types.NestedField UNPARTITIONED_PARTITION_FIELD =
+ Types.NestedField.optional(
+ DataFile.PARTITION_ID,
+ DataFile.PARTITION_NAME,
+ Types.StructType.of(),
+ DataFile.PARTITION_DOC);
+
private static final Set<String> STATS_COLUMNS =
ImmutableSet.of(
"value_counts",
@@ -173,6 +180,12 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
private static <T extends ContentFile<T>> Map<String, String>
readMetadata(InputFile inputFile) {
+ FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location());
+ Preconditions.checkArgument(
+ manifestFormat == FileFormat.AVRO,
+ "Reading manifest metadata is only supported for Avro manifests: %s",
+ inputFile.location());
+
Map<String, String> metadata;
try {
try (CloseableIterable<ManifestEntry<T>> headerReader =
@@ -298,8 +311,22 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
Preconditions.checkArgument(
format != null, "Unable to determine format of manifest: %s",
file.location());
+ boolean unpartitioned = spec.rawPartitionType().fields().isEmpty();
+
+ // V4+ manifests omit the partition field when unpartitioned (Parquet
cannot represent
+ // empty structs, and the field is meaningless regardless of format). Mark
it optional so
+ // the reader returns null for the missing field instead of throwing. The
field must stay
+ // in the projection to preserve positional access for callers like
StructProjection.
+ // For older versions where the empty struct is present, making it
optional is harmless.
List<Types.NestedField> fields = Lists.newArrayList();
- fields.addAll(projection.asStruct().fields());
+ for (Types.NestedField field : projection.asStruct().fields()) {
+ if (unpartitioned && field.fieldId() == DataFile.PARTITION_ID) {
+ fields.add(UNPARTITIONED_PARTITION_FIELD);
+ } else {
+ fields.add(field);
+ }
+ }
+
if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) {
fields.add(DataFile.RECORD_COUNT);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 7d85f991b0..321bcd89d8 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
@@ -40,6 +41,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
+ private final FileFormat format;
private final OutputFile file;
private final EncryptionKeyMetadata keyMetadata;
private final int specId;
@@ -65,7 +67,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Long firstRowId,
Map<String, String> writerProperties) {
- this.file = file.encryptingOutputFile();
+ this.format =
FileFormat.fromFileName(file.encryptingOutputFile().location());
+ this.file = outputFile(file);
this.specId = spec.specId();
this.writerProperties = writerProperties;
this.writer = newAppender(spec, this.file);
@@ -82,6 +85,21 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
protected abstract FileAppender<ManifestEntry<F>> newAppender(
PartitionSpec spec, OutputFile outputFile);
+ private OutputFile outputFile(EncryptedOutputFile encryptedFile) {
+ // Casting to NativeEncryptionOutputFile actually makes the file rely on
native encryption
+ // rather than whole-file encryption.
+ if (format == FileFormat.PARQUET
+ && encryptedFile instanceof NativeEncryptionOutputFile nativeFile) {
+ return nativeFile;
+ }
+
+ return encryptedFile.encryptingOutputFile();
+ }
+
+ protected FileFormat format() {
+ return format;
+ }
+
protected Map<String, String> writerProperties() {
return writerProperties;
}
@@ -206,16 +224,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not
closed");
- ByteBuffer keyMetadataBuffer;
- if (keyMetadata instanceof NativeEncryptionKeyMetadata) {
- // File length is required by AES GCM Stream encryption, to prevent file
truncation attacks
- keyMetadataBuffer =
- ((NativeEncryptionKeyMetadata)
keyMetadata).copyWithLength(length()).buffer();
- } else if (keyMetadata != null) {
- keyMetadataBuffer = keyMetadata.buffer();
- } else {
- keyMetadataBuffer = null;
- }
+ ByteBuffer keyMetadataBuffer = keyMetadataBuffer();
// if the minSequenceNumber is null, then no manifests with a sequence
number have been written,
// so the min data sequence number is the one that will be assigned when
this is committed.
@@ -240,6 +249,19 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
firstRowId);
}
+ private ByteBuffer keyMetadataBuffer() {
+ if (keyMetadata instanceof NativeEncryptionKeyMetadata nativeKeyMetadata
+ && format == FileFormat.AVRO) {
+ // Whole-file encryption needs the file length embedded for GCM
truncation protection.
+ // Formats with native encryption (like Parquet) handle this directly
and don't need it.
+ return nativeKeyMetadata.copyWithLength(length()).buffer();
+ } else if (keyMetadata != null) {
+ return keyMetadata.buffer();
+ }
+
+ return null;
+ }
+
@Override
public void close() throws IOException {
this.closed = true;
@@ -256,7 +278,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long firstRowId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, firstRowId, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId,
spec.partitionType());
}
@Override
@@ -269,7 +291,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
@@ -296,7 +318,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId,
spec.partitionType());
}
@Override
@@ -309,7 +331,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
@@ -342,6 +364,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long firstRowId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, firstRowId, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got:
%s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -382,6 +406,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got:
%s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -427,6 +453,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got:
%s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -467,6 +495,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got:
%s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -512,6 +542,8 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
Long snapshotId,
Map<String, String> writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V1 manifests must use Avro, but got:
%s", format());
this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 6ba10e8049..e351009a9e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -113,6 +113,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
private final long targetManifestSizeBytes;
+ private final FileFormat manifestFormat;
private final Map<String, String> manifestWriterProps;
private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
@@ -142,6 +143,10 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
this.targetManifestSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+ this.manifestFormat =
+ ops.current().formatVersion() >=
TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+ ? FileFormat.PARQUET
+ : FileFormat.AVRO;
this.manifestWriterProps = manifestWriterProperties(ops.current());
boolean snapshotIdInheritanceEnabled =
ops.current()
@@ -603,7 +608,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
protected EncryptedOutputFile newManifestOutputFile() {
String manifestFileLocation =
ops.metadataFileLocation(
- FileFormat.AVRO.addExtension(commitUUID + "-m" +
manifestCount.getAndIncrement()));
+ manifestFormat.addExtension(commitUUID + "-m" +
manifestCount.getAndIncrement()));
return EncryptingFileIO.combine(ops.io(), ops.encryption())
.newEncryptingOutputFile(manifestFileLocation);
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 43a67dd2be..c4a7bfc5c8 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
+ static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java
b/core/src/main/java/org/apache/iceberg/V4Metadata.java
index 67478290aa..06fc75213d 100644
--- a/core/src/main/java/org/apache/iceberg/V4Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -23,6 +23,7 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
class V4Metadata {
@@ -278,28 +279,38 @@ class V4Metadata {
}
static Types.StructType fileType(Types.StructType partitionType) {
- return Types.StructType.of(
- DataFile.CONTENT.asRequired(),
- DataFile.FILE_PATH,
- DataFile.FILE_FORMAT,
- required(
- DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType,
DataFile.PARTITION_DOC),
- DataFile.RECORD_COUNT,
- DataFile.FILE_SIZE,
- DataFile.COLUMN_SIZES,
- DataFile.VALUE_COUNTS,
- DataFile.NULL_VALUE_COUNTS,
- DataFile.NAN_VALUE_COUNTS,
- DataFile.LOWER_BOUNDS,
- DataFile.UPPER_BOUNDS,
- DataFile.KEY_METADATA,
- DataFile.SPLIT_OFFSETS,
- DataFile.EQUALITY_IDS,
- DataFile.SORT_ORDER_ID,
- DataFile.FIRST_ROW_ID,
- DataFile.REFERENCED_DATA_FILE,
- DataFile.CONTENT_OFFSET,
- DataFile.CONTENT_SIZE);
+ // Parquet cannot represent empty groups, so the partition field is
omitted entirely from
+ // the file schema for unpartitioned tables. DataFileWrapper adjusts
positions to match.
+ ImmutableList.Builder<Types.NestedField> fields =
+ ImmutableList.builderWithExpectedSize(partitionType.fields().isEmpty()
? 18 : 19);
+ fields.add(DataFile.CONTENT.asRequired());
+ fields.add(DataFile.FILE_PATH);
+ fields.add(DataFile.FILE_FORMAT);
+ if (!partitionType.fields().isEmpty()) {
+ fields.add(
+ required(
+ DataFile.PARTITION_ID,
+ DataFile.PARTITION_NAME,
+ partitionType,
+ DataFile.PARTITION_DOC));
+ }
+ fields.add(DataFile.RECORD_COUNT);
+ fields.add(DataFile.FILE_SIZE);
+ fields.add(DataFile.COLUMN_SIZES);
+ fields.add(DataFile.VALUE_COUNTS);
+ fields.add(DataFile.NULL_VALUE_COUNTS);
+ fields.add(DataFile.NAN_VALUE_COUNTS);
+ fields.add(DataFile.LOWER_BOUNDS);
+ fields.add(DataFile.UPPER_BOUNDS);
+ fields.add(DataFile.KEY_METADATA);
+ fields.add(DataFile.SPLIT_OFFSETS);
+ fields.add(DataFile.EQUALITY_IDS);
+ fields.add(DataFile.SORT_ORDER_ID);
+ fields.add(DataFile.FIRST_ROW_ID);
+ fields.add(DataFile.REFERENCED_DATA_FILE);
+ fields.add(DataFile.CONTENT_OFFSET);
+ fields.add(DataFile.CONTENT_SIZE);
+ return Types.StructType.of(fields.build());
}
static class ManifestEntryWrapper<F extends ContentFile<F>>
@@ -309,10 +320,10 @@ class V4Metadata {
private final DataFileWrapper<?> fileWrapper;
private ManifestEntry<F> wrapped = null;
- ManifestEntryWrapper(Long commitSnapshotId) {
- this.size = entrySchema(Types.StructType.of()).columns().size();
+ ManifestEntryWrapper(Long commitSnapshotId, Types.StructType
partitionType) {
+ this.size = entrySchema(partitionType).columns().size();
this.commitSnapshotId = commitSnapshotId;
- this.fileWrapper = new DataFileWrapper<>();
+ this.fileWrapper = new DataFileWrapper<>(partitionType);
}
public ManifestEntryWrapper<F> wrap(ManifestEntry<F> entry) {
@@ -423,11 +434,15 @@ class V4Metadata {
/** Wrapper used to write DataFile or DeleteFile to v4 metadata. */
static class DataFileWrapper<F extends ContentFile<F>> extends
Delegates.DelegatingContentFile<F>
implements ContentFile<F>, StructLike {
+ private static final int PARTITION_POSITION = 3;
+
private final int size;
+ private final boolean hasPartition;
- DataFileWrapper() {
+ DataFileWrapper(Types.StructType partitionType) {
super(null);
- this.size = fileType(Types.StructType.of()).fields().size();
+ this.hasPartition = !partitionType.fields().isEmpty();
+ this.size = fileType(partitionType).fields().size();
}
@SuppressWarnings("unchecked")
@@ -452,7 +467,10 @@ class V4Metadata {
}
private Object get(int pos) {
- switch (pos) {
+ // when the partition field is omitted, positions at or after where it
would appear
+ // shift down by 1, so adjust back to the canonical field ordering
+ int adjusted = hasPartition ? pos : (pos >= PARTITION_POSITION ? pos + 1
: pos);
+ switch (adjusted) {
case 0:
return wrapped.content().id();
case 1:
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java
b/core/src/test/java/org/apache/iceberg/TestBase.java
index 27b8a49d04..0f649cabeb 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -263,7 +263,8 @@ public class TestBase {
.listFiles(
(dir, name) ->
!name.startsWith("snap")
- &&
Files.getFileExtension(name).equalsIgnoreCase("avro")));
+ &&
(Files.getFileExtension(name).equalsIgnoreCase("avro")
+ ||
Files.getFileExtension(name).equalsIgnoreCase("parquet"))));
}
List<File> listManifestLists(File tableDirToList) {
@@ -297,12 +298,22 @@ public class TestBase {
return TestTables.readMetadata("test");
}
+ static FileFormat manifestFormat(int version) {
+ return version >= TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+ ? FileFormat.PARQUET
+ : FileFormat.AVRO;
+ }
+
+ FileFormat manifestFormat() {
+ return manifestFormat(formatVersion);
+ }
+
ManifestFile writeManifest(DataFile... files) throws IOException {
return writeManifest(null, files);
}
ManifestFile writeManifest(Long snapshotId, DataFile... files) throws
IOException {
- File manifestFile = temp.resolve("input.m0.avro").toFile();
+ File manifestFile =
temp.resolve(manifestFormat().addExtension("input.m0")).toFile();
assertThat(manifestFile).doesNotExist();
OutputFile outputFile =
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
@@ -324,7 +335,7 @@ public class TestBase {
}
ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries)
throws IOException {
- return writeManifest(snapshotId, "input.m0.avro", entries);
+ return writeManifest(snapshotId,
manifestFormat().addExtension("input.m0"), entries);
}
@SuppressWarnings("unchecked")
@@ -360,8 +371,8 @@ public class TestBase {
throws IOException {
OutputFile manifestFile =
org.apache.iceberg.Files.localOutput(
- FileFormat.AVRO.addExtension(
- temp.resolve("junit" +
System.nanoTime()).toFile().toString()));
+ manifestFormat(newFormatVersion)
+ .addExtension(temp.resolve("junit" +
System.nanoTime()).toFile().toString()));
ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC,
manifestFile, snapshotId);
try {
@@ -375,7 +386,7 @@ public class TestBase {
}
ManifestFile writeManifestWithName(String name, DataFile... files) throws
IOException {
- File manifestFile = temp.resolve(name + ".avro").toFile();
+ File manifestFile =
temp.resolve(manifestFormat().addExtension(name)).toFile();
assertThat(manifestFile).doesNotExist();
OutputFile outputFile =
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index 8f427525e2..bc28ecd880 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -509,14 +509,18 @@ public class TestFastAppend extends TestBase {
assertThat(base.currentSnapshot()).isNull();
ManifestFile manifestWithExistingFiles =
- writeManifest("manifest-file-1.avro", manifestEntry(Status.EXISTING,
null, FILE_A));
+ writeManifest(
+ manifestFormat().addExtension("manifest-file-1"),
+ manifestEntry(Status.EXISTING, null, FILE_A));
assertThatThrownBy(
() ->
table.newFastAppend().appendManifest(manifestWithExistingFiles).commit())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot append manifest with existing files");
ManifestFile manifestWithDeletedFiles =
- writeManifest("manifest-file-2.avro", manifestEntry(Status.DELETED,
null, FILE_A));
+ writeManifest(
+ manifestFormat().addExtension("manifest-file-2"),
+ manifestEntry(Status.DELETED, null, FILE_A));
assertThatThrownBy(
() ->
table.newFastAppend().appendManifest(manifestWithDeletedFiles).commit())
.isInstanceOf(IllegalArgumentException.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index 6690a1483e..de2b7fd859 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -273,6 +273,10 @@ public class TestManifestReader extends TestBase {
@SuppressWarnings("deprecation")
@TestTemplate
public void testDeprecatedReadWithoutSpecsById() throws IOException {
+ assumeThat(formatVersion)
+ .as("Deprecated read without specsById requires Avro metadata; V4 uses
Parquet")
+ .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
ManifestFile manifest = writeManifest(1000L,
manifestEntry(Status.EXISTING, 1000L, FILE_A));
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
FILE_IO)) {
ManifestEntry<DataFile> entry =
Iterables.getOnlyElement(reader.entries());
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
index 00e66bdd7d..d710d949c5 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -41,7 +41,7 @@ public class TestManifestWriter extends TestBase {
public void testManifestStats() throws IOException {
ManifestFile manifest =
writeManifest(
- "manifest.avro",
+ manifestFormat().addExtension("manifest"),
manifestEntry(Status.ADDED, null, newFile(10)),
manifestEntry(Status.ADDED, null, newFile(20)),
manifestEntry(Status.ADDED, null, newFile(5)),
@@ -67,7 +67,7 @@ public class TestManifestWriter extends TestBase {
public void testManifestPartitionStats() throws IOException {
ManifestFile manifest =
writeManifest(
- "manifest.avro",
+ manifestFormat().addExtension("manifest"),
manifestEntry(Status.ADDED, null, newFile(10,
TestHelpers.Row.of(1))),
manifestEntry(Status.EXISTING, null, newFile(15,
TestHelpers.Row.of(2))),
manifestEntry(Status.DELETED, null, newFile(2,
TestHelpers.Row.of(3))));
@@ -92,7 +92,8 @@ public class TestManifestWriter extends TestBase {
@TestTemplate
public void testWriteManifestWithSequenceNumber() throws IOException {
assumeThat(formatVersion).isGreaterThan(1);
- File manifestFile = temp.resolve("manifest" + System.nanoTime() +
".avro").toFile();
+ File manifestFile =
+ temp.resolve(manifestFormat().addExtension("manifest" +
System.nanoTime())).toFile();
OutputFile outputFile =
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
ManifestWriter<DataFile> writer =
ManifestFiles.write(formatVersion, table.spec(), outputFile, 1L);
@@ -119,7 +120,7 @@ public class TestManifestWriter extends TestBase {
ManifestFile manifest =
writeManifest(
- "manifest.avro",
+ manifestFormat().addExtension("manifest"),
manifestEntry(Status.ADDED, null, dataSequenceNumber, null, file1),
manifestEntry(Status.ADDED, null, dataSequenceNumber, null,
file2));
@@ -161,7 +162,7 @@ public class TestManifestWriter extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest.avro",
+ manifestFormat().addExtension("manifest"),
manifestEntry(Status.EXISTING, appendSnapshotId,
appendSequenceNumber, null, file1),
manifestEntry(Status.EXISTING, appendSnapshotId,
appendSequenceNumber, null, file2));
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 5e83827f0c..966b573bd9 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -115,6 +115,8 @@ public class TestManifestWriterVersions {
null,
null);
+ static final List<FileFormat> V4_FORMATS = ImmutableList.of(FileFormat.AVRO,
FileFormat.PARQUET);
+
@TempDir private Path temp;
@Test
@@ -344,6 +346,100 @@ public class TestManifestWriterVersions {
assertThat(readAvroCodec(manifestFile)).isEqualTo("snappy");
}
+ @ParameterizedTest
+ @FieldSource("V4_FORMATS")
+ public void testV4WritePartitioned(FileFormat fileFormat) throws IOException
{
+ ManifestFile manifest = writeManifest(4, fileFormat, SPEC, DATA_FILE);
+ checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+ checkEntry(
+ readManifest(manifest),
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.DATA,
+ FIRST_ROW_ID);
+ }
+
+ @ParameterizedTest
+ @FieldSource("V4_FORMATS")
+ public void testV4WriteUnpartitioned(FileFormat fileFormat) throws
IOException {
+ DataFile unpartitionedFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(PATH)
+ .withFormat(FORMAT)
+ .withFileSizeInBytes(150972L)
+ .withMetrics(METRICS)
+ .withSplitOffsets(OFFSETS)
+ .withSortOrderId(SORT_ORDER_ID)
+ .withFirstRowId(FIRST_ROW_ID)
+ .build();
+
+ ManifestFile manifest =
+ writeManifest(4, fileFormat, PartitionSpec.unpartitioned(),
unpartitionedFile);
+ checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+
+ Map<Integer, PartitionSpec> unpartitionedSpecs =
+ ImmutableMap.of(PartitionSpec.unpartitioned().specId(),
PartitionSpec.unpartitioned());
+ try (CloseableIterable<ManifestEntry<DataFile>> reader =
+ ManifestFiles.read(manifest, io, unpartitionedSpecs).entries()) {
+ ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader);
+ assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED);
+ assertThat(entry.file().location()).isEqualTo(PATH);
+ assertThat(entry.file().recordCount()).isEqualTo(METRICS.recordCount());
+ assertThat(entry.file().firstRowId()).isEqualTo(FIRST_ROW_ID);
+ }
+ }
+
+ @ParameterizedTest
+ @FieldSource("V4_FORMATS")
+ public void testV4WriteDeletePartitioned(FileFormat fileFormat) throws
IOException {
+ ManifestFile manifest = writeDeleteManifest(4, fileFormat, SPEC);
+ checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+ assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES);
+ checkEntry(
+ readDeleteManifest(manifest),
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.EQUALITY_DELETES);
+ }
+
+ @ParameterizedTest
+ @FieldSource("V4_FORMATS")
+ public void testV4WriteDeleteUnpartitioned(FileFormat fileFormat) throws
IOException {
+ DeleteFile unpartitionedDelete =
+ new GenericDeleteFile(
+ 0,
+ FileContent.EQUALITY_DELETES,
+ PATH,
+ FORMAT,
+ new PartitionData(PartitionSpec.unpartitioned().partitionType()),
+ 22905L,
+ METRICS,
+ EQUALITY_ID_ARR,
+ SORT_ORDER_ID,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ ManifestFile manifest =
+ writeDeleteManifest(4, fileFormat, PartitionSpec.unpartitioned(),
unpartitionedDelete);
+ checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+ assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES);
+
+ Map<Integer, PartitionSpec> unpartitionedSpecs =
+ ImmutableMap.of(PartitionSpec.unpartitioned().specId(),
PartitionSpec.unpartitioned());
+ try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+ ManifestFiles.readDeleteManifest(manifest, io,
unpartitionedSpecs).entries()) {
+ ManifestEntry<DeleteFile> entry = Iterables.getOnlyElement(reader);
+ assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED);
+
assertThat(entry.file().content()).isEqualTo(FileContent.EQUALITY_DELETES);
+ assertThat(entry.file().location()).isEqualTo(PATH);
+ assertThat(entry.file().recordCount()).isEqualTo(METRICS.recordCount());
+ assertThat(entry.file().equalityFieldIds()).isEqualTo(EQUALITY_IDS);
+ }
+ }
+
void checkEntry(
ManifestEntry<?> entry,
Long expectedDataSequenceNumber,
@@ -466,7 +562,7 @@ public class TestManifestWriterVersions {
private ManifestFile rewriteManifest(ManifestFile manifest, int
formatVersion)
throws IOException {
- String filename = FileFormat.AVRO.addExtension("rewrite-manifest");
+ String filename =
TestBase.manifestFormat(formatVersion).addExtension("rewrite-manifest");
EncryptedOutputFile manifestFile =
encryptionManager().encrypt(io.newOutputFile(filename));
ManifestWriter<DataFile> writer =
ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
@@ -483,10 +579,16 @@ public class TestManifestWriterVersions {
}
private ManifestFile writeManifest(int formatVersion, DataFile... files)
throws IOException {
- String filename = FileFormat.AVRO.addExtension("manifest");
+ return writeManifest(formatVersion,
TestBase.manifestFormat(formatVersion), SPEC, files);
+ }
+
+ private ManifestFile writeManifest(
+ int formatVersion, FileFormat fileFormat, PartitionSpec spec,
DataFile... files)
+ throws IOException {
+ String filename = fileFormat.addExtension("manifest");
EncryptedOutputFile manifestFile =
encryptionManager().encrypt(io.newOutputFile(filename));
ManifestWriter<DataFile> writer =
- ManifestFiles.newWriter(formatVersion, SPEC, manifestFile,
SNAPSHOT_ID, FIRST_ROW_ID);
+ ManifestFiles.newWriter(formatVersion, spec, manifestFile,
SNAPSHOT_ID, FIRST_ROW_ID);
try {
for (DataFile file : files) {
writer.add(file);
@@ -512,12 +614,23 @@ public class TestManifestWriterVersions {
}
private ManifestFile writeDeleteManifest(int formatVersion) throws
IOException {
- String filename = FileFormat.AVRO.addExtension("manifest");
+ return writeDeleteManifest(formatVersion,
TestBase.manifestFormat(formatVersion), SPEC);
+ }
+
+ private ManifestFile writeDeleteManifest(
+ int formatVersion, FileFormat fileFormat, PartitionSpec spec) throws
IOException {
+ return writeDeleteManifest(formatVersion, fileFormat, spec, DELETE_FILE);
+ }
+
+ private ManifestFile writeDeleteManifest(
+ int formatVersion, FileFormat fileFormat, PartitionSpec spec, DeleteFile
deleteFile)
+ throws IOException {
+ String filename = fileFormat.addExtension("manifest");
EncryptedOutputFile manifestFile =
encryptionManager().encrypt(io.newOutputFile(filename));
ManifestWriter<DeleteFile> writer =
- ManifestFiles.writeDeleteManifest(formatVersion, SPEC, manifestFile,
SNAPSHOT_ID);
+ ManifestFiles.writeDeleteManifest(formatVersion, spec, manifestFile,
SNAPSHOT_ID);
try {
- writer.add(DELETE_FILE);
+ writer.add(deleteFile);
} finally {
writer.close();
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 3947f16fe1..b7700d7ce7 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -629,7 +629,8 @@ public class TestMergeAppend extends TestBase {
.newAppend()
.appendManifest(
writeManifest(
- "input-m0.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))),
+ manifestFormat().addExtension("input-m0"),
+ manifestEntry(ManifestEntry.Status.ADDED, null,
FILE_C))),
branch);
base = readMetadata();
@@ -671,7 +672,8 @@ public class TestMergeAppend extends TestBase {
.newAppend()
.appendManifest(
writeManifest(
- "input-m1.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))),
+ manifestFormat().addExtension("input-m1"),
+ manifestEntry(ManifestEntry.Status.ADDED, null,
FILE_D))),
branch);
base = readMetadata();
@@ -1274,7 +1276,7 @@ public class TestMergeAppend extends TestBase {
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
- ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro",
FILE_A, FILE_B);
+ ManifestFile manifest1 = writeManifestWithName("manifest-file-1", FILE_A,
FILE_B);
Snapshot snap1 = commit(table,
table.newAppend().appendManifest(manifest1), branch);
long commitId1 = snap1.snapshotId();
@@ -1290,7 +1292,7 @@ public class TestMergeAppend extends TestBase {
statuses(Status.ADDED, Status.ADDED));
assertThat(new File(manifest1.path())).exists();
- ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro",
FILE_C, FILE_D);
+ ManifestFile manifest2 = writeManifestWithName("manifest-file-2", FILE_C,
FILE_D);
Snapshot snap2 = commit(table,
table.newAppend().appendManifest(manifest2), branch);
long commitId2 = snap2.snapshotId();
@@ -1347,7 +1349,9 @@ public class TestMergeAppend extends TestBase {
assertThat(base.currentSnapshot()).isNull();
ManifestFile manifestWithExistingFiles =
- writeManifest("manifest-file-1.avro", manifestEntry(Status.EXISTING,
null, FILE_A));
+ writeManifest(
+ manifestFormat().addExtension("manifest-file-1"),
+ manifestEntry(Status.EXISTING, null, FILE_A));
assertThatThrownBy(
() ->
commit(table,
table.newAppend().appendManifest(manifestWithExistingFiles), branch))
@@ -1356,7 +1360,9 @@ public class TestMergeAppend extends TestBase {
assertThat(readMetadata().lastSequenceNumber()).isEqualTo(0);
ManifestFile manifestWithDeletedFiles =
- writeManifest("manifest-file-2.avro", manifestEntry(Status.DELETED,
null, FILE_A));
+ writeManifest(
+ manifestFormat().addExtension("manifest-file-2"),
+ manifestEntry(Status.DELETED, null, FILE_A));
assertThatThrownBy(
() -> commit(table,
table.newAppend().appendManifest(manifestWithDeletedFiles), branch))
.isInstanceOf(IllegalArgumentException.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 19ee156c9e..dab323743b 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -55,7 +55,8 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED,
null, FILE_A));
+ manifestFormat().addExtension("manifest-file-1"),
+ manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
table.newFastAppend().appendManifest(newManifest).commit();
long appendId = table.currentSnapshot().snapshotId();
@@ -79,7 +80,8 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED,
null, FILE_A));
+ manifestFormat().addExtension("manifest-file-1"),
+ manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
table.newFastAppend().appendManifest(newManifest).commit();
@@ -115,7 +117,8 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED,
null, FILE_A));
+ manifestFormat().addExtension("manifest-file-1"),
+ manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
table.newFastAppend().appendManifest(newManifest).commit();
long manifestAppendId = table.currentSnapshot().snapshotId();
@@ -428,11 +431,11 @@ public class TestRewriteManifests extends TestBase {
ManifestFile firstNewManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -492,11 +495,11 @@ public class TestRewriteManifests extends TestBase {
ManifestFile firstNewManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -679,11 +682,11 @@ public class TestRewriteManifests extends TestBase {
ManifestFile firstNewManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -741,11 +744,11 @@ public class TestRewriteManifests extends TestBase {
ManifestFile firstNewManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -796,11 +799,11 @@ public class TestRewriteManifests extends TestBase {
ManifestFile firstNewManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -841,7 +844,7 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A));
table
@@ -904,7 +907,8 @@ public class TestRewriteManifests extends TestBase {
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the
writer
entry.setDataSequenceNumber(firstSnapshot.sequenceNumber());
- ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry);
+ ManifestFile newManifest =
+ writeManifest(manifestFormat().addExtension("manifest-file-1"), entry);
RewriteManifests rewriteManifests =
table
@@ -954,7 +958,8 @@ public class TestRewriteManifests extends TestBase {
// update the entry's sequence number or else it will be rejected by the
writer
appendEntry.setDataSequenceNumber(snapshot.sequenceNumber());
- ManifestFile invalidAddedFileManifest =
writeManifest("manifest-file-2.avro", appendEntry);
+ ManifestFile invalidAddedFileManifest =
+ writeManifest(manifestFormat().addExtension("manifest-file-2"),
appendEntry);
assertThatThrownBy(
() ->
@@ -971,7 +976,8 @@ public class TestRewriteManifests extends TestBase {
// update the entry's sequence number or else it will be rejected by the
writer
deleteEntry.setDataSequenceNumber(snapshot.sequenceNumber());
- ManifestFile invalidDeletedFileManifest =
writeManifest("manifest-file-3.avro", deleteEntry);
+ ManifestFile invalidDeletedFileManifest =
+ writeManifest(manifestFormat().addExtension("manifest-file-3"),
deleteEntry);
assertThatThrownBy(
() ->
@@ -1009,7 +1015,7 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file.avro",
+ manifestFormat().addExtension("manifest-file"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING,
secondSnapshot.snapshotId(), FILE_B));
@@ -1051,7 +1057,7 @@ public class TestRewriteManifests extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file.avro",
+ manifestFormat().addExtension("manifest-file"),
manifestEntry(ManifestEntry.Status.EXISTING,
firstSnapshot.snapshotId(), FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING,
secondSnapshot.snapshotId(), FILE_B));
@@ -1176,7 +1182,7 @@ public class TestRewriteManifests extends TestBase {
Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
ManifestFile newDeleteManifest1 =
writeManifest(
- "delete-manifest-file-1.avro",
+ manifestFormat().addExtension("delete-manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1185,7 +1191,7 @@ public class TestRewriteManifests extends TestBase {
fileADeletes()));
ManifestFile newDeleteManifest2 =
writeManifest(
- "delete-manifest-file-2.avro",
+ manifestFormat().addExtension("delete-manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1262,7 +1268,7 @@ public class TestRewriteManifests extends TestBase {
Iterables.getOnlyElement(deleteSnapshot.dataManifests(table.io()));
ManifestFile newDataManifest1 =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
appendSnapshotId,
@@ -1271,7 +1277,7 @@ public class TestRewriteManifests extends TestBase {
FILE_A));
ManifestFile newDataManifest2 =
writeManifest(
- "manifest-file-2.avro",
+ manifestFormat().addExtension("manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
appendSnapshotId,
@@ -1284,7 +1290,7 @@ public class TestRewriteManifests extends TestBase {
Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
ManifestFile newDeleteManifest1 =
writeManifest(
- "delete-manifest-file-1.avro",
+ manifestFormat().addExtension("delete-manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1293,7 +1299,7 @@ public class TestRewriteManifests extends TestBase {
fileADeletes()));
ManifestFile newDeleteManifest2 =
writeManifest(
- "delete-manifest-file-2.avro",
+ manifestFormat().addExtension("delete-manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1376,7 +1382,7 @@ public class TestRewriteManifests extends TestBase {
Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
ManifestFile newDeleteManifest1 =
writeManifest(
- "delete-manifest-file-1.avro",
+ manifestFormat().addExtension("delete-manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1385,7 +1391,7 @@ public class TestRewriteManifests extends TestBase {
fileADeletes()));
ManifestFile newDeleteManifest2 =
writeManifest(
- "delete-manifest-file-2.avro",
+ manifestFormat().addExtension("delete-manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1486,7 +1492,7 @@ public class TestRewriteManifests extends TestBase {
ManifestFile originalDeleteManifest =
deleteSnapshot1.deleteManifests(table.io()).get(0);
ManifestFile newDeleteManifest1 =
writeManifest(
- "delete-manifest-file-1.avro",
+ manifestFormat().addExtension("delete-manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId1,
@@ -1495,7 +1501,7 @@ public class TestRewriteManifests extends TestBase {
fileADeletes()));
ManifestFile newDeleteManifest2 =
writeManifest(
- "delete-manifest-file-2.avro",
+ manifestFormat().addExtension("delete-manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId1,
@@ -1581,7 +1587,7 @@ public class TestRewriteManifests extends TestBase {
ManifestFile originalDeleteManifest =
deleteSnapshot.deleteManifests(table.io()).get(0);
ManifestFile newDeleteManifest1 =
writeManifest(
- "delete-manifest-file-1.avro",
+ manifestFormat().addExtension("delete-manifest-file-1"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1590,7 +1596,7 @@ public class TestRewriteManifests extends TestBase {
fileADeletes()));
ManifestFile newDeleteManifest2 =
writeManifest(
- "delete-manifest-file-2.avro",
+ manifestFormat().addExtension("delete-manifest-file-2"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId,
@@ -1645,7 +1651,7 @@ public class TestRewriteManifests extends TestBase {
// combine the original delete manifests into 1 new delete manifest
ManifestFile newDeleteManifest =
writeManifest(
- "delete-manifest-file.avro",
+ manifestFormat().addExtension("delete-manifest-file"),
manifestEntry(
ManifestEntry.Status.EXISTING,
deleteSnapshotId1,
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index dd97738759..c6092f0238 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -227,7 +228,11 @@ public class TestSnapshotProducer extends TestBase {
}
@TestTemplate
- public void testDefaultManifestCompression() throws IOException {
+ public void testDefaultAvroManifestCompression() throws IOException {
+ assumeThat(formatVersion)
+ .as("V4 uses Parquet manifests by default; Avro codec checks do not
apply")
+ .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
table.newFastAppend().appendFile(FILE_A).commit();
ManifestFile manifest =
table.currentSnapshot().dataManifests(table.io()).get(0);
@@ -235,7 +240,11 @@ public class TestSnapshotProducer extends TestBase {
}
@TestTemplate
- public void testManifestCompressionFromTableProperty() throws IOException {
+ public void testAvroManifestCompressionFromTableProperty() throws
IOException {
+ assumeThat(formatVersion)
+ .as("V4 uses Parquet manifests by default; Avro codec checks do not
apply")
+ .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
table.updateProperties().set(TableProperties.MANIFEST_COMPRESSION,
"snappy").commit();
table.newFastAppend().appendFile(FILE_A).commit();
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 9ec8c47840..fe47ac6256 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -666,7 +666,7 @@ public class TestTransaction extends TestBase {
ManifestFile newManifest =
writeManifest(
- "manifest-file-1.avro",
+ manifestFormat().addExtension("manifest-file-1"),
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId,
FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId,
FILE_B));
@@ -811,7 +811,7 @@ public class TestTransaction extends TestBase {
.rewriteManifests()
.addManifest(
writeManifest(
- "new_delete_manifest.avro",
+ manifestFormat().addExtension("new_delete_manifest"),
// Specify data sequence number so that the delete files
don't get aged out
// first
manifestEntry(
@@ -880,7 +880,7 @@ public class TestTransaction extends TestBase {
.rewriteManifests()
.addManifest(
writeManifest(
- "new_manifest.avro",
+ manifestFormat().addExtension("new_manifest"),
manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A),
manifestEntry(Status.EXISTING, first.snapshotId(),
FILE_A2),
manifestEntry(Status.EXISTING, second.snapshotId(),
FILE_B)))
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index ff0af5c563..ce22c8089b 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -130,7 +130,7 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
return Stream.of(new File(location).listFiles())
.filter(file -> !file.isDirectory())
.map(File::getName)
- .filter(fileName -> fileName.endsWith(".avro"))
+ .filter(fileName -> !fileName.startsWith(".") &&
!fileName.endsWith("metadata.json"))
.collect(Collectors.toList());
}
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
index 8d24160320..0c7e032bde 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
@@ -20,11 +20,14 @@ package org.apache.iceberg.util;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
@@ -35,24 +38,32 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+class TestManifestFileUtil {
+ private static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
-public class TestManifestFileUtil {
private static final Schema SCHEMA =
new Schema(
optional(1, "id", Types.IntegerType.get()),
optional(2, "unknown", Types.UnknownType.get()),
optional(3, "floats", Types.FloatType.get()));
+ private final AtomicInteger manifestCounter = new AtomicInteger(0);
+
@TempDir private Path temp;
- @Test
- public void canContainWithUnknownTypeOnly() throws IOException {
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ void canContainWithUnknownTypeOnly(int formatVersion) throws IOException {
+ // Parquet cannot represent the empty struct produced by an
UnknownType-only partition
+ assumeThat(formatVersion).isLessThan(MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("unknown").build();
PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, "someValue");
- ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+ ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec,
partition);
assertThat(
ManifestFileUtil.canContainAny(
@@ -62,12 +73,13 @@ public class TestManifestFileUtil {
.isTrue();
}
- @Test
- public void canContainWithNaNValueOnly() throws IOException {
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ void canContainWithNaNValueOnly(int formatVersion) throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, Float.NaN);
- ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+ ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec,
partition);
assertThat(
ManifestFileUtil.canContainAny(
@@ -77,12 +89,13 @@ public class TestManifestFileUtil {
.isTrue();
}
- @Test
- public void canContainWithNullValueOnly() throws IOException {
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ void canContainWithNullValueOnly(int formatVersion) throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, null);
- ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+ ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec,
partition);
assertThat(
ManifestFileUtil.canContainAny(
@@ -92,14 +105,15 @@ public class TestManifestFileUtil {
.isTrue();
}
- @Test
- public void canContainWithUnknownType() throws IOException {
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ void canContainWithUnknownType(int formatVersion) throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("floats").identity("unknown").build();
PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, 1.0f);
partition.set(1, "someValue");
- ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+ ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec,
partition);
assertThat(
ManifestFileUtil.canContainAny(
@@ -109,9 +123,16 @@ public class TestManifestFileUtil {
.isTrue();
}
- private ManifestFile writeManifestWithDataFile(PartitionSpec spec,
PartitionData partition)
- throws IOException {
- ManifestWriter<DataFile> writer = ManifestFiles.write(spec,
Files.localOutput(temp.toFile()));
+ private ManifestFile writeManifestWithDataFile(
+ int formatVersion, PartitionSpec spec, PartitionData partition) throws
IOException {
+ FileFormat format =
+ formatVersion >= MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+ ? FileFormat.PARQUET
+ : FileFormat.AVRO;
+ String filename = format.addExtension("manifest-" +
manifestCounter.getAndIncrement());
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(
+ formatVersion, spec,
Files.localOutput(temp.resolve(filename).toFile()), null);
try (writer) {
writer.add(
DataFiles.builder(spec)
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index 8aa9aa4779..63d6d80d58 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -25,7 +25,11 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -828,6 +832,16 @@ public class ParquetValueReaders {
protected abstract T buildList(I list);
}
+ // Only recycle known growable JDK collections as scratch buffers. Reuse may
be an unmodifiable
+ // view, Guava immutable type, List.of / Map.of, etc.; those are not these
concrete classes.
+ private static boolean canReuseListAsReadBuffer(List<?> list) {
+ return list instanceof ArrayList || list instanceof LinkedList;
+ }
+
+ private static boolean canReuseMapAsReadBuffer(Map<?, ?> map) {
+ return map instanceof LinkedHashMap || map instanceof HashMap;
+ }
+
public static class ListReader<E> extends RepeatedReader<List<E>, List<E>,
E> {
private List<E> lastList = null;
private Iterator<E> elements = null;
@@ -847,7 +861,7 @@ public class ParquetValueReaders {
}
if (reuse != null) {
- this.lastList = reuse;
+ this.lastList = canReuseListAsReadBuffer(reuse) ? reuse : null;
this.elements = reuse.iterator();
} else {
this.lastList = null;
@@ -973,7 +987,7 @@ public class ParquetValueReaders {
}
if (reuse != null) {
- this.lastMap = reuse;
+ this.lastMap = canReuseMapAsReadBuffer(reuse) ? reuse : null;
this.pairs = reuse.entrySet().iterator();
} else {
this.lastMap = null;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends
TestBase {
.as(Encoders.STRING())
.collectAsList();
Predicate<String> isManifest =
- f ->
- (f.contains("optimized-m-") && f.endsWith(".avro"))
- || f.endsWith("-m0.avro")
- || f.endsWith("-m1.avro");
+ f -> f.contains("optimized-m-") || f.contains("-m0.") ||
f.contains("-m1.");
Predicate<String> isManifestList = f -> f.contains("snap-") &&
f.endsWith(".avro");
Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends
TestBase {
.as(Encoders.STRING())
.collectAsList();
Predicate<String> isManifest =
- f ->
- (f.contains("optimized-m-") && f.endsWith(".avro"))
- || f.endsWith("-m0.avro")
- || f.endsWith("-m1.avro");
+ f -> f.contains("optimized-m-") || f.contains("-m0.") ||
f.contains("-m1.");
Predicate<String> isManifestList = f -> f.contains("snap-") &&
f.endsWith(".avro");
Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends
TestBase {
.as(Encoders.STRING())
.collectAsList();
Predicate<String> isManifest =
- f ->
- (f.contains("optimized-m-") && f.endsWith(".avro"))
- || f.endsWith("-m0.avro")
- || f.endsWith("-m1.avro");
+ f -> f.contains("optimized-m-") || f.contains("-m0.") ||
f.contains("-m1.");
Predicate<String> isManifestList = f -> f.contains("snap-") &&
f.endsWith(".avro");
Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");