This is an automated email from the ASF dual-hosted git repository.

RussellSpitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f23da211bb Core, Parquet: Allow for Writing Parquet/Avro Manifests in 
V4 (#15634)
f23da211bb is described below

commit f23da211bbe108a060df30c1f65684858aea0e8d
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri May 22 15:52:38 2026 -0500

    Core, Parquet: Allow for Writing Parquet/Avro Manifests in V4 (#15634)
---
 .../java/org/apache/iceberg/ManifestBenchmark.java | 171 ++++++--------------
 .../org/apache/iceberg/ManifestBenchmarkUtil.java  | 120 ++++++++++++++
 ...mark.java => ManifestCompressionBenchmark.java} | 111 ++-----------
 .../org/apache/iceberg/ManifestReadBenchmark.java  | 173 ---------------------
 .../org/apache/iceberg/ManifestWriteBenchmark.java | 173 ---------------------
 .../src/main/java/org/apache/iceberg/BaseFile.java |  41 ++++-
 .../java/org/apache/iceberg/ManifestReader.java    |  29 +++-
 .../java/org/apache/iceberg/ManifestWriter.java    |  62 ++++++--
 .../java/org/apache/iceberg/SnapshotProducer.java  |   7 +-
 .../java/org/apache/iceberg/TableMetadata.java     |   1 +
 .../main/java/org/apache/iceberg/V4Metadata.java   |  74 +++++----
 .../src/test/java/org/apache/iceberg/TestBase.java |  23 ++-
 .../java/org/apache/iceberg/TestFastAppend.java    |   8 +-
 .../org/apache/iceberg/TestManifestReader.java     |   4 +
 .../org/apache/iceberg/TestManifestWriter.java     |  11 +-
 .../apache/iceberg/TestManifestWriterVersions.java | 125 ++++++++++++++-
 .../java/org/apache/iceberg/TestMergeAppend.java   |  18 ++-
 .../org/apache/iceberg/TestRewriteManifests.java   |  70 +++++----
 .../org/apache/iceberg/TestSnapshotProducer.java   |  13 +-
 .../java/org/apache/iceberg/TestTransaction.java   |   6 +-
 .../org/apache/iceberg/jdbc/TestJdbcCatalog.java   |   2 +-
 .../apache/iceberg/util/TestManifestFileUtil.java  |  55 +++++--
 .../iceberg/parquet/ParquetValueReaders.java       |  18 ++-
 .../spark/actions/TestRewriteTablePathsAction.java |   5 +-
 .../spark/actions/TestRewriteTablePathsAction.java |   5 +-
 .../spark/actions/TestRewriteTablePathsAction.java |   5 +-
 26 files changed, 627 insertions(+), 703 deletions(-)

diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
index cbd372b7a4..b1b3847c5a 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -18,23 +18,15 @@
  */
 package org.apache.iceberg;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.openjdk.jmh.annotations.AuxCounters;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -52,7 +44,12 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 /**
- * A benchmark that measures manifest read/write performance across 
compression codecs.
+ * A benchmark that measures manifest read/write performance across format 
versions and file
+ * formats.
+ *
+ * <p>V1-V3 only support Avro manifests. V4 supports both Avro and Parquet. 
The {@code
+ * versionFormat} parameter encodes valid combinations as {@code 
"<version>_<format>"} (e.g. {@code
+ * "4_PARQUET"}) so that only meaningful pairings are benchmarked.
  *
  * <p>Entry counts are calibrated per column count via {@link #ENTRY_BASE}. 
Set to 300_000 for ~8 MB
  * manifests (matching the default {@code commit.manifest.target-size-bytes}) 
or 15_000 for ~400 KB.
@@ -63,13 +60,25 @@ import org.openjdk.jmh.infra.Blackhole;
  * # all combinations
  * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
  *
- * # single codec
+ * # V4-only (Avro vs Parquet)
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ *     -PjmhParams="versionFormat=4_AVRO|4_PARQUET"
+ *
+ * # all versions, single column count
  * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
- *     -PjmhParams="codec=gzip"
+ *     -PjmhParams="numCols=50"
+ *
+ * # single version
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ *     -PjmhParams="versionFormat=3_AVRO"
  * }</pre>
  */
 @Fork(1)
 @State(Scope.Benchmark)
+// Parquet's columnar write path has a deep call graph (per-column encoders, 
page assembly,
+// dictionary management) that requires more warmup iterations than Avro for 
the JIT compiler to
+// fully optimize. Profiling shows ~650ms of JIT compilation spread across the 
first 3-4
+// iterations, so 6 warmups ensure measurement begins after JIT has stabilized.
 @Warmup(iterations = 6)
 @Measurement(iterations = 10)
 @BenchmarkMode(Mode.SingleShotTime)
@@ -78,19 +87,8 @@ public class ManifestBenchmark {
 
   static final int ENTRY_BASE = 300_000;
 
-  private static final int FORMAT_VERSION = 4;
-
-  private static final Schema SCHEMA =
-      new Schema(
-          Types.NestedField.required(1, "id", Types.IntegerType.get()),
-          Types.NestedField.required(2, "data", Types.StringType.get()),
-          Types.NestedField.required(3, "customer", Types.StringType.get()));
-
-  private static final PartitionSpec SPEC =
-      
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
-  @Param({"gzip", "snappy", "zstd", "uncompressed"})
-  private String codec;
+  @Param({"1_AVRO", "2_AVRO", "3_AVRO", "4_AVRO", "4_PARQUET"})
+  private String versionFormat;
 
   @Param({"true", "false"})
   private String partitioned;
@@ -98,11 +96,11 @@ public class ManifestBenchmark {
   @Param({"10", "50", "100"})
   private int numCols;
 
+  private int formatVersion;
+  private FileFormat fileFormat;
   private PartitionSpec spec;
   private Map<Integer, PartitionSpec> specsById;
-  private Map<String, String> writerProperties;
   private List<DataFile> dataFiles;
-  private int numEntries;
 
   private String writeBaseDir;
   private OutputFile writeOutputFile;
@@ -112,21 +110,26 @@ public class ManifestBenchmark {
 
   @Setup(Level.Trial)
   public void setupTrial() {
-    this.spec = Boolean.parseBoolean(partitioned) ? SPEC : 
PartitionSpec.unpartitioned();
-    this.specsById = Map.of(spec.specId(), spec);
-    this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
-    // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 → 
~400 KB manifests
-    this.numEntries = ENTRY_BASE / numCols;
-    this.dataFiles = generateDataFiles();
+    String[] parts = versionFormat.split("_", 2);
+    this.formatVersion = Integer.parseInt(parts[0]);
+    this.fileFormat = FileFormat.fromString(parts[1]);
+    this.spec =
+        Boolean.parseBoolean(partitioned)
+            ? ManifestBenchmarkUtil.SPEC
+            : PartitionSpec.unpartitioned();
+    this.specsById = ImmutableMap.of(spec.specId(), spec);
+    int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE, 
numCols);
+    this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, 
numCols);
     setupReadManifest();
   }
 
   @Setup(Level.Invocation)
   public void setupWriteInvocation() throws IOException {
-    this.writeBaseDir = 
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+    this.writeBaseDir =
+        
java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
     this.writeOutputFile =
-        org.apache.iceberg.Files.localOutput(
-            String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+        Files.localOutput(
+            String.format(Locale.ROOT, "%s/%s", writeBaseDir, 
fileFormat.addExtension("manifest")));
 
     for (DataFile file : dataFiles) {
       file.path();
@@ -137,7 +140,7 @@ public class ManifestBenchmark {
 
   @TearDown(Level.Trial)
   public void tearDownTrial() {
-    cleanDir(readBaseDir);
+    ManifestBenchmarkUtil.cleanDir(readBaseDir);
     readBaseDir = null;
     readManifest = null;
     dataFiles = null;
@@ -145,28 +148,15 @@ public class ManifestBenchmark {
 
   @TearDown(Level.Invocation)
   public void tearDownInvocation() {
-    cleanDir(writeBaseDir);
+    ManifestBenchmarkUtil.cleanDir(writeBaseDir);
     writeBaseDir = null;
     writeOutputFile = null;
   }
 
-  @AuxCounters(AuxCounters.Type.EVENTS)
-  @State(Scope.Thread)
-  @SuppressWarnings("checkstyle:VisibilityModifier")
-  public static class FileSizeCounters {
-    public double manifestSizeMB;
-
-    @Setup(Level.Invocation)
-    public void reset() {
-      manifestSizeMB = 0;
-    }
-  }
-
   @Benchmark
   @Threads(1)
-  public ManifestFile writeManifest(FileSizeCounters counters) throws 
IOException {
-    ManifestWriter<DataFile> writer =
-        ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, 
writerProperties);
+  public ManifestFile writeManifest() throws IOException {
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, spec, 
writeOutputFile, 1L);
 
     try (ManifestWriter<DataFile> w = writer) {
       for (DataFile file : dataFiles) {
@@ -174,9 +164,7 @@ public class ManifestBenchmark {
       }
     }
 
-    ManifestFile manifest = writer.toManifestFile();
-    counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
-    return manifest;
+    return writer.toManifestFile();
   }
 
   @Benchmark
@@ -193,17 +181,17 @@ public class ManifestBenchmark {
 
   private void setupReadManifest() {
     try {
-      this.readBaseDir = 
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+      this.readBaseDir =
+          
java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
 
     OutputFile manifestFile =
-        org.apache.iceberg.Files.localOutput(
-            String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+        Files.localOutput(
+            String.format(Locale.ROOT, "%s/%s", readBaseDir, 
fileFormat.addExtension("manifest")));
 
-    ManifestWriter<DataFile> writer =
-        ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, 
writerProperties);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, spec, 
manifestFile, 1L);
 
     try (ManifestWriter<DataFile> w = writer) {
       for (DataFile file : dataFiles) {
@@ -215,65 +203,4 @@ public class ManifestBenchmark {
 
     this.readManifest = writer.toManifestFile();
   }
-
-  private List<DataFile> generateDataFiles() {
-    Random random = new Random(42);
-    List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
-    for (int i = 0; i < numEntries; i++) {
-      DataFiles.Builder builder =
-          DataFiles.builder(spec)
-              .withFormat(FileFormat.PARQUET)
-              .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", 
i))
-              .withFileSizeInBytes(1024 + i)
-              .withRecordCount(1000 + i)
-              .withMetrics(randomMetrics(random, numCols));
-
-      if (!spec.isUnpartitioned()) {
-        builder.withPartitionPath(
-            String.format(
-                Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i 
% 50, i % 200));
-      }
-
-      files.add(builder.build());
-    }
-
-    return files;
-  }
-
-  static Metrics randomMetrics(Random random, int cols) {
-    long rowCount = 100_000L + random.nextInt(1000);
-    Map<Integer, Long> columnSizes = Maps.newHashMap();
-    Map<Integer, Long> valueCounts = Maps.newHashMap();
-    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
-    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
-    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
-    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
-    for (int i = 0; i < cols; i++) {
-      columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
-      valueCounts.put(i, 100_000L + random.nextInt(100));
-      nullValueCounts.put(i, (long) random.nextInt(5));
-      nanValueCounts.put(i, (long) random.nextInt(5));
-      byte[] lower = new byte[8];
-      random.nextBytes(lower);
-      lowerBounds.put(i, ByteBuffer.wrap(lower));
-      byte[] upper = new byte[8];
-      random.nextBytes(upper);
-      upperBounds.put(i, ByteBuffer.wrap(upper));
-    }
-
-    return new Metrics(
-        rowCount,
-        columnSizes,
-        valueCounts,
-        nullValueCounts,
-        nanValueCounts,
-        lowerBounds,
-        upperBounds);
-  }
-
-  private static void cleanDir(String dir) {
-    if (dir != null) {
-      FileUtils.deleteQuietly(new File(dir));
-    }
-  }
 }
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
new file mode 100644
index 0000000000..d37c48daab
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Shared constants and stateless helpers for {@link ManifestBenchmark} and 
{@link
+ * ManifestCompressionBenchmark}.
+ */
+final class ManifestBenchmarkUtil {
+
+  static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "data", Types.StringType.get()),
+          Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+  static final PartitionSpec SPEC =
+      
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+  private ManifestBenchmarkUtil() {}
+
+  /**
+   * Returns the number of manifest entries for the given column count. The 
result is {@code
+   * entryBase / cols}.
+   *
+   * <p>The linear ratio was determined empirically by writing manifests at 
various column counts
+   * and measuring the resulting file sizes. An {@code entryBase} of 300,000 
produces ~8 MB
+   * manifests (matching the default {@code 
commit.manifest.target-size-bytes}); 15,000 produces
+   * ~400 KB.
+   */
+  static int entriesForColumnCount(int entryBase, int cols) {
+    return entryBase / cols;
+  }
+
+  static List<DataFile> generateDataFiles(PartitionSpec spec, int numEntries, 
int numCols) {
+    Random random = new Random(42);
+    List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
+    for (int i = 0; i < numEntries; i++) {
+      DataFiles.Builder builder =
+          DataFiles.builder(spec)
+              .withFormat(FileFormat.PARQUET)
+              .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", 
i))
+              .withFileSizeInBytes(1024 + i)
+              .withRecordCount(1000 + i)
+              .withMetrics(randomMetrics(random, numCols));
+
+      if (spec.isPartitioned()) {
+        builder.withPartitionPath(
+            String.format(
+                Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i 
% 50, i % 200));
+      }
+
+      files.add(builder.build());
+    }
+    return files;
+  }
+
+  static Metrics randomMetrics(Random random, int cols) {
+    long rowCount = 100_000L + random.nextInt(1000);
+    Map<Integer, Long> columnSizes = Maps.newHashMap();
+    Map<Integer, Long> valueCounts = Maps.newHashMap();
+    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+    for (int i = 0; i < cols; i++) {
+      columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+      valueCounts.put(i, 100_000L + random.nextInt(100));
+      nullValueCounts.put(i, (long) random.nextInt(5));
+      nanValueCounts.put(i, (long) random.nextInt(5));
+      byte[] lower = new byte[8];
+      random.nextBytes(lower);
+      lowerBounds.put(i, ByteBuffer.wrap(lower));
+      byte[] upper = new byte[8];
+      random.nextBytes(upper);
+      upperBounds.put(i, ByteBuffer.wrap(upper));
+    }
+
+    return new Metrics(
+        rowCount,
+        columnSizes,
+        valueCounts,
+        nullValueCounts,
+        nanValueCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
+  static void cleanDir(String dir) {
+    if (dir != null) {
+      FileUtils.deleteQuietly(new java.io.File(dir));
+    }
+  }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
similarity index 58%
copy from core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
copy to core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
index cbd372b7a4..bf09ae18f9 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
@@ -18,22 +18,14 @@
  */
 package org.apache.iceberg;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
 import org.openjdk.jmh.annotations.AuxCounters;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -61,10 +53,10 @@ import org.openjdk.jmh.infra.Blackhole;
  *
  * <pre>{@code
  * # all combinations
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark
  *
  * # single codec
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark \
  *     -PjmhParams="codec=gzip"
  * }</pre>
  */
@@ -74,21 +66,12 @@ import org.openjdk.jmh.infra.Blackhole;
 @Measurement(iterations = 10)
 @BenchmarkMode(Mode.SingleShotTime)
 @Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
-public class ManifestBenchmark {
+public class ManifestCompressionBenchmark {
 
   static final int ENTRY_BASE = 300_000;
 
   private static final int FORMAT_VERSION = 4;
 
-  private static final Schema SCHEMA =
-      new Schema(
-          Types.NestedField.required(1, "id", Types.IntegerType.get()),
-          Types.NestedField.required(2, "data", Types.StringType.get()),
-          Types.NestedField.required(3, "customer", Types.StringType.get()));
-
-  private static final PartitionSpec SPEC =
-      
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
   @Param({"gzip", "snappy", "zstd", "uncompressed"})
   private String codec;
 
@@ -102,7 +85,6 @@ public class ManifestBenchmark {
   private Map<Integer, PartitionSpec> specsById;
   private Map<String, String> writerProperties;
   private List<DataFile> dataFiles;
-  private int numEntries;
 
   private String writeBaseDir;
   private OutputFile writeOutputFile;
@@ -112,21 +94,23 @@ public class ManifestBenchmark {
 
   @Setup(Level.Trial)
   public void setupTrial() {
-    this.spec = Boolean.parseBoolean(partitioned) ? SPEC : 
PartitionSpec.unpartitioned();
+    this.spec =
+        Boolean.parseBoolean(partitioned)
+            ? ManifestBenchmarkUtil.SPEC
+            : PartitionSpec.unpartitioned();
     this.specsById = Map.of(spec.specId(), spec);
     this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
-    // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 → 
~400 KB manifests
-    this.numEntries = ENTRY_BASE / numCols;
-    this.dataFiles = generateDataFiles();
+    int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE, 
numCols);
+    this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, 
numCols);
     setupReadManifest();
   }
 
   @Setup(Level.Invocation)
   public void setupWriteInvocation() throws IOException {
-    this.writeBaseDir = 
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+    this.writeBaseDir =
+        
java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
     this.writeOutputFile =
-        org.apache.iceberg.Files.localOutput(
-            String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+        Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", 
writeBaseDir));
 
     for (DataFile file : dataFiles) {
       file.path();
@@ -137,7 +121,7 @@ public class ManifestBenchmark {
 
   @TearDown(Level.Trial)
   public void tearDownTrial() {
-    cleanDir(readBaseDir);
+    ManifestBenchmarkUtil.cleanDir(readBaseDir);
     readBaseDir = null;
     readManifest = null;
     dataFiles = null;
@@ -145,7 +129,7 @@ public class ManifestBenchmark {
 
   @TearDown(Level.Invocation)
   public void tearDownInvocation() {
-    cleanDir(writeBaseDir);
+    ManifestBenchmarkUtil.cleanDir(writeBaseDir);
     writeBaseDir = null;
     writeOutputFile = null;
   }
@@ -193,14 +177,14 @@ public class ManifestBenchmark {
 
   private void setupReadManifest() {
     try {
-      this.readBaseDir = 
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+      this.readBaseDir =
+          
java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
 
     OutputFile manifestFile =
-        org.apache.iceberg.Files.localOutput(
-            String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+        Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", 
readBaseDir));
 
     ManifestWriter<DataFile> writer =
         ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, 
writerProperties);
@@ -215,65 +199,4 @@ public class ManifestBenchmark {
 
     this.readManifest = writer.toManifestFile();
   }
-
-  private List<DataFile> generateDataFiles() {
-    Random random = new Random(42);
-    List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
-    for (int i = 0; i < numEntries; i++) {
-      DataFiles.Builder builder =
-          DataFiles.builder(spec)
-              .withFormat(FileFormat.PARQUET)
-              .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", 
i))
-              .withFileSizeInBytes(1024 + i)
-              .withRecordCount(1000 + i)
-              .withMetrics(randomMetrics(random, numCols));
-
-      if (!spec.isUnpartitioned()) {
-        builder.withPartitionPath(
-            String.format(
-                Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i 
% 50, i % 200));
-      }
-
-      files.add(builder.build());
-    }
-
-    return files;
-  }
-
-  static Metrics randomMetrics(Random random, int cols) {
-    long rowCount = 100_000L + random.nextInt(1000);
-    Map<Integer, Long> columnSizes = Maps.newHashMap();
-    Map<Integer, Long> valueCounts = Maps.newHashMap();
-    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
-    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
-    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
-    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
-    for (int i = 0; i < cols; i++) {
-      columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
-      valueCounts.put(i, 100_000L + random.nextInt(100));
-      nullValueCounts.put(i, (long) random.nextInt(5));
-      nanValueCounts.put(i, (long) random.nextInt(5));
-      byte[] lower = new byte[8];
-      random.nextBytes(lower);
-      lowerBounds.put(i, ByteBuffer.wrap(lower));
-      byte[] upper = new byte[8];
-      random.nextBytes(upper);
-      upperBounds.put(i, ByteBuffer.wrap(upper));
-    }
-
-    return new Metrics(
-        rowCount,
-        columnSizes,
-        valueCounts,
-        nullValueCounts,
-        nanValueCounts,
-        lowerBounds,
-        upperBounds);
-  }
-
-  private static void cleanDir(String dir) {
-    if (dir != null) {
-      FileUtils.deleteQuietly(new File(dir));
-    }
-  }
 }
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
deleted file mode 100644
index 588b5df1ba..0000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
-public class ManifestReadBenchmark {
-
-  private static final int NUM_FILES = 10;
-  private static final int NUM_ROWS = 100000;
-  private static final int NUM_COLS = 10;
-
-  private String baseDir;
-  private String manifestListFile;
-
-  @Setup
-  public void before() {
-    baseDir =
-        Paths.get(new 
File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
-    manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
-    Random random = new Random(System.currentTimeMillis());
-
-    try (ManifestListWriter listWriter =
-        ManifestLists.write(
-            1,
-            org.apache.iceberg.Files.localOutput(manifestListFile),
-            PlaintextEncryptionManager.instance(),
-            0,
-            1L,
-            0,
-            0L)) {
-      for (int i = 0; i < NUM_FILES; i++) {
-        OutputFile manifestFile =
-            org.apache.iceberg.Files.localOutput(
-                String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
-        ManifestWriter<DataFile> writer =
-            ManifestFiles.write(1, PartitionSpec.unpartitioned(), 
manifestFile, 1L);
-        try (ManifestWriter<DataFile> finalWriter = writer) {
-          for (int j = 0; j < NUM_ROWS; j++) {
-            DataFile dataFile =
-                DataFiles.builder(PartitionSpec.unpartitioned())
-                    .withFormat(FileFormat.PARQUET)
-                    .withPath(String.format("/path/to/data-%s-%s.parquet", i, 
j))
-                    .withFileSizeInBytes(j)
-                    .withRecordCount(j)
-                    .withMetrics(randomMetrics(random))
-                    .build();
-            finalWriter.add(dataFile);
-          }
-        } catch (IOException e) {
-          throw new UncheckedIOException(e);
-        }
-
-        listWriter.add(writer.toManifestFile());
-      }
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  @TearDown
-  public void after() throws IOException {
-    if (baseDir != null) {
-      try (Stream<Path> walk = Files.walk(Paths.get(baseDir))) {
-        
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
-      }
-      baseDir = null;
-    }
-
-    manifestListFile = null;
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readManifestFile() throws IOException {
-    List<ManifestFile> manifests =
-        
ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
-    TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
-    Map<Integer, PartitionSpec> specs =
-        ImmutableMap.of(PartitionSpec.unpartitioned().specId(), 
PartitionSpec.unpartitioned());
-    for (ManifestFile manifestFile : manifests) {
-      ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, 
fileIO, specs);
-      try (CloseableIterator<DataFile> it = reader.iterator()) {
-        while (it.hasNext()) {
-          it.next().recordCount();
-        }
-      }
-    }
-  }
-
-  private Metrics randomMetrics(Random random) {
-    long rowCount = 100000L + random.nextInt(1000);
-    Map<Integer, Long> columnSizes = Maps.newHashMap();
-    Map<Integer, Long> valueCounts = Maps.newHashMap();
-    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
-    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
-    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
-    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
-    for (int i = 0; i < NUM_COLS; i++) {
-      columnSizes.put(i, 1000000L + random.nextInt(100000));
-      valueCounts.put(i, 100000L + random.nextInt(100));
-      nullValueCounts.put(i, (long) random.nextInt(5));
-      nanValueCounts.put(i, (long) random.nextInt(5));
-      byte[] lower = new byte[8];
-      random.nextBytes(lower);
-      lowerBounds.put(i, ByteBuffer.wrap(lower));
-      byte[] upper = new byte[8];
-      random.nextBytes(upper);
-      upperBounds.put(i, ByteBuffer.wrap(upper));
-    }
-
-    return new Metrics(
-        rowCount,
-        columnSizes,
-        valueCounts,
-        nullValueCounts,
-        nanValueCounts,
-        lowerBounds,
-        upperBounds);
-  }
-}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
deleted file mode 100644
index b0dab63dea..0000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-/**
- * A benchmark that evaluates the performance of writing manifest files
- *
- * <p>To run this benchmark: <code>
- *   ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestWriteBenchmark
- * </code>
- */
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
-public class ManifestWriteBenchmark {
-
-  private static final int NUM_FILES = 10;
-  private static final int NUM_ROWS = 100000;
-  private static final int NUM_COLS = 100;
-
-  private String baseDir;
-  private String manifestListFile;
-
-  private Metrics metrics;
-
-  @Setup
-  public void before() {
-    Random random = new Random(System.currentTimeMillis());
-    // Pre-create the metrics to avoid doing this in the benchmark itself
-    metrics = randomMetrics(random);
-  }
-
-  @TearDown
-  public void after() {
-    if (baseDir != null) {
-      FileUtils.deleteQuietly(new File(baseDir));
-      baseDir = null;
-    }
-
-    manifestListFile = null;
-  }
-
-  @State(Scope.Benchmark)
-  public static class BenchmarkState {
-    @Param({"1", "2"})
-    private int formatVersion;
-
-    public int getFormatVersion() {
-      return formatVersion;
-    }
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void writeManifestFile(BenchmarkState state) throws IOException {
-    this.baseDir =
-        
java.nio.file.Files.createTempDirectory("benchmark-").toAbsolutePath().toString();
-    this.manifestListFile = String.format("%s/%s.avro", baseDir, 
UUID.randomUUID());
-
-    try (ManifestListWriter listWriter =
-        ManifestLists.write(
-            state.getFormatVersion(),
-            org.apache.iceberg.Files.localOutput(manifestListFile),
-            PlaintextEncryptionManager.instance(),
-            0,
-            1L,
-            0,
-            0L)) {
-      for (int i = 0; i < NUM_FILES; i++) {
-        OutputFile manifestFile =
-            org.apache.iceberg.Files.localOutput(
-                String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
-        ManifestWriter<DataFile> writer =
-            ManifestFiles.write(
-                state.formatVersion, PartitionSpec.unpartitioned(), 
manifestFile, 1L);
-        try (ManifestWriter<DataFile> finalWriter = writer) {
-          for (int j = 0; j < NUM_ROWS; j++) {
-            DataFile dataFile =
-                DataFiles.builder(PartitionSpec.unpartitioned())
-                    .withFormat(FileFormat.PARQUET)
-                    .withPath(String.format("/path/to/data-%s-%s.parquet", i, 
j))
-                    .withFileSizeInBytes(j)
-                    .withRecordCount(j)
-                    .withMetrics(metrics)
-                    .build();
-            finalWriter.add(dataFile);
-          }
-        } catch (IOException e) {
-          throw new UncheckedIOException(e);
-        }
-
-        listWriter.add(writer.toManifestFile());
-      }
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private Metrics randomMetrics(Random random) {
-    long rowCount = 100000L + random.nextInt(1000);
-    Map<Integer, Long> columnSizes = Maps.newHashMap();
-    Map<Integer, Long> valueCounts = Maps.newHashMap();
-    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
-    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
-    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
-    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
-    for (int i = 0; i < NUM_COLS; i++) {
-      columnSizes.put(i, 1000000L + random.nextInt(100000));
-      valueCounts.put(i, 100000L + random.nextInt(100));
-      nullValueCounts.put(i, (long) random.nextInt(5));
-      nanValueCounts.put(i, (long) random.nextInt(5));
-      byte[] lower = new byte[8];
-      random.nextBytes(lower);
-      lowerBounds.put(i, ByteBuffer.wrap(lower));
-      byte[] upper = new byte[8];
-      random.nextBytes(upper);
-      upperBounds.put(i, ByteBuffer.wrap(upper));
-    }
-
-    return new Metrics(
-        rowCount,
-        columnSizes,
-        valueCounts,
-        nullValueCounts,
-        nanValueCounts,
-        lowerBounds,
-        upperBounds);
-  }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java 
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 3c31c50f09..7147ba5878 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -32,6 +32,7 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.avro.SupportsIndexProjection;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ArrayUtil;
@@ -329,7 +330,11 @@ abstract class BaseFile<F> extends SupportsIndexProjection
         this.partitionSpecId = (value != null) ? (Integer) value : -1;
         return;
       case 4:
-        this.partitionData = (PartitionData) value;
+        // Preserve the constructor-initialized partitionData when the reader 
returns null
+        // (e.g., v4 Parquet manifests for unpartitioned tables omit the 
partition field).
+        if (value != null) {
+          this.partitionData = (PartitionData) value;
+        }
         return;
       case 5:
         this.recordCount = (Long) value;
@@ -581,9 +586,37 @@ abstract class BaseFile<F> extends SupportsIndexProjection
 
   private static Map<Integer, ByteBuffer> copyByteBufferMap(
       Map<Integer, ByteBuffer> map, Set<Integer> keys) {
-    return SerializableByteBufferMap.wrap(copyMap(map, keys));
+    if (map == null) {
+      return null;
+    }
+
+    return SerializableByteBufferMap.wrap(deepCopyByteBufferMap(map, keys));
+  }
+
+  // Required as long as we have Map<Integer, ByteBuffer> in the API since 
Parquet reuses buffers.
+  private static Map<Integer, ByteBuffer> deepCopyByteBufferMap(
+      Map<Integer, ByteBuffer> map, Set<Integer> keys) {
+    Map<Integer, ByteBuffer> deepCopy = 
Maps.newHashMapWithExpectedSize(map.size());
+    for (Map.Entry<Integer, ByteBuffer> entry : map.entrySet()) {
+      if (keys == null || keys.contains(entry.getKey())) {
+        ByteBuffer buf = entry.getValue();
+        if (buf != null) {
+          ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+          copy.put(buf.duplicate());
+          copy.flip();
+          deepCopy.put(entry.getKey(), copy);
+        } else {
+          deepCopy.put(entry.getKey(), null);
+        }
+      }
+    }
+
+    return deepCopy;
   }
 
+  // Returns an unmodifiable view of the map. The SerializableMap check is 
needed because
+  // internal maps may be wrapped for serialization after being populated by a 
format reader
+  // with container reuse enabled, and immutableMap() provides a stable 
snapshot.
   private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
     if (map == null) {
       return null;
@@ -594,6 +627,10 @@ abstract class BaseFile<F> extends SupportsIndexProjection
     }
   }
 
+  // Separate from toReadableMap because SerializableByteBufferMap is its own 
wrapper type
+  // (not a SerializableMap subclass) to handle ByteBuffer-specific 
serialization. ByteBuffer
+  // values are mutable and can be overwritten by Parquet container reuse, so 
callers that
+  // retain references must use copyByteBufferMap to get independent copies.
   private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, 
ByteBuffer> map) {
     if (map == null) {
       return null;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index e3c2325ab7..dc34836b6c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -60,6 +60,13 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
 
   static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
 
+  private static final Types.NestedField UNPARTITIONED_PARTITION_FIELD =
+      Types.NestedField.optional(
+          DataFile.PARTITION_ID,
+          DataFile.PARTITION_NAME,
+          Types.StructType.of(),
+          DataFile.PARTITION_DOC);
+
   private static final Set<String> STATS_COLUMNS =
       ImmutableSet.of(
           "value_counts",
@@ -173,6 +180,12 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
   }
 
   private static <T extends ContentFile<T>> Map<String, String> 
readMetadata(InputFile inputFile) {
+    FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location());
+    Preconditions.checkArgument(
+        manifestFormat == FileFormat.AVRO,
+        "Reading manifest metadata is only supported for Avro manifests: %s",
+        inputFile.location());
+
     Map<String, String> metadata;
     try {
       try (CloseableIterable<ManifestEntry<T>> headerReader =
@@ -298,8 +311,22 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
     Preconditions.checkArgument(
         format != null, "Unable to determine format of manifest: %s", 
file.location());
 
+    boolean unpartitioned = spec.rawPartitionType().fields().isEmpty();
+
+    // V4+ manifests omit the partition field when unpartitioned (Parquet 
cannot represent
+    // empty structs, and the field is meaningless regardless of format). Mark 
it optional so
+    // the reader returns null for the missing field instead of throwing. The 
field must stay
+    // in the projection to preserve positional access for callers like 
StructProjection.
+    // For older versions where the empty struct is present, making it 
optional is harmless.
     List<Types.NestedField> fields = Lists.newArrayList();
-    fields.addAll(projection.asStruct().fields());
+    for (Types.NestedField field : projection.asStruct().fields()) {
+      if (unpartitioned && field.fieldId() == DataFile.PARTITION_ID) {
+        fields.add(UNPARTITIONED_PARTITION_FIELD);
+      } else {
+        fields.add(field);
+      }
+    }
+
     if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) {
       fields.add(DataFile.RECORD_COUNT);
     }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 7d85f991b0..321bcd89d8 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
 import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
@@ -40,6 +41,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   // this is replaced when writing a manifest list by the ManifestFile wrapper
   static final long UNASSIGNED_SEQ = -1L;
 
+  private final FileFormat format;
   private final OutputFile file;
   private final EncryptionKeyMetadata keyMetadata;
   private final int specId;
@@ -65,7 +67,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
       Long snapshotId,
       Long firstRowId,
       Map<String, String> writerProperties) {
-    this.file = file.encryptingOutputFile();
+    this.format = 
FileFormat.fromFileName(file.encryptingOutputFile().location());
+    this.file = outputFile(file);
     this.specId = spec.specId();
     this.writerProperties = writerProperties;
     this.writer = newAppender(spec, this.file);
@@ -82,6 +85,21 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   protected abstract FileAppender<ManifestEntry<F>> newAppender(
       PartitionSpec spec, OutputFile outputFile);
 
+  private OutputFile outputFile(EncryptedOutputFile encryptedFile) {
+    // Casting to NativeEncryptionOutputFile actually makes the file rely on 
native encryption
+    // rather than whole-file encryption.
+    if (format == FileFormat.PARQUET
+        && encryptedFile instanceof NativeEncryptionOutputFile nativeFile) {
+      return nativeFile;
+    }
+
+    return encryptedFile.encryptingOutputFile();
+  }
+
+  protected FileFormat format() {
+    return format;
+  }
+
   protected Map<String, String> writerProperties() {
     return writerProperties;
   }
@@ -206,16 +224,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   public ManifestFile toManifestFile() {
     Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not 
closed");
 
-    ByteBuffer keyMetadataBuffer;
-    if (keyMetadata instanceof NativeEncryptionKeyMetadata) {
-      // File length is required by AES GCM Stream encryption, to prevent file 
truncation attacks
-      keyMetadataBuffer =
-          ((NativeEncryptionKeyMetadata) 
keyMetadata).copyWithLength(length()).buffer();
-    } else if (keyMetadata != null) {
-      keyMetadataBuffer = keyMetadata.buffer();
-    } else {
-      keyMetadataBuffer = null;
-    }
+    ByteBuffer keyMetadataBuffer = keyMetadataBuffer();
 
     // if the minSequenceNumber is null, then no manifests with a sequence 
number have been written,
     // so the min data sequence number is the one that will be assigned when 
this is committed.
@@ -240,6 +249,19 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         firstRowId);
   }
 
+  private ByteBuffer keyMetadataBuffer() {
+    if (keyMetadata instanceof NativeEncryptionKeyMetadata nativeKeyMetadata
+        && format == FileFormat.AVRO) {
+      // Whole-file encryption needs the file length embedded for GCM 
truncation protection.
+      // Formats with native encryption (like Parquet) handle this directly 
and don't need it.
+      return nativeKeyMetadata.copyWithLength(length()).buffer();
+    } else if (keyMetadata != null) {
+      return keyMetadata.buffer();
+    }
+
+    return null;
+  }
+
   @Override
   public void close() throws IOException {
     this.closed = true;
@@ -256,7 +278,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long firstRowId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, firstRowId, writerProperties);
-      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId, 
spec.partitionType());
     }
 
     @Override
@@ -269,7 +291,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         PartitionSpec spec, OutputFile file) {
       Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
       try {
-        return InternalData.write(FileFormat.AVRO, file)
+        return InternalData.write(format(), file)
             .schema(manifestSchema)
             .named("manifest_entry")
             .meta("schema", SchemaParser.toJson(spec.schema()))
@@ -296,7 +318,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long snapshotId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, null, writerProperties);
-      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId, 
spec.partitionType());
     }
 
     @Override
@@ -309,7 +331,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         PartitionSpec spec, OutputFile file) {
       Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
       try {
-        return InternalData.write(FileFormat.AVRO, file)
+        return InternalData.write(format(), file)
             .schema(manifestSchema)
             .named("manifest_entry")
             .meta("schema", SchemaParser.toJson(spec.schema()))
@@ -342,6 +364,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long firstRowId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, firstRowId, writerProperties);
+      Preconditions.checkArgument(
+          format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: 
%s", format());
       this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -382,6 +406,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long snapshotId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, null, writerProperties);
+      Preconditions.checkArgument(
+          format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: 
%s", format());
       this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -427,6 +453,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long snapshotId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, null, writerProperties);
+      Preconditions.checkArgument(
+          format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: 
%s", format());
       this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -467,6 +495,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long snapshotId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, null, writerProperties);
+      Preconditions.checkArgument(
+          format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: 
%s", format());
       this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -512,6 +542,8 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         Long snapshotId,
         Map<String, String> writerProperties) {
       super(spec, file, snapshotId, null, writerProperties);
+      Preconditions.checkArgument(
+          format() == FileFormat.AVRO, "V1 manifests must use Avro, but got: 
%s", format());
       this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 6ba10e8049..e351009a9e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -113,6 +113,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
   private final long targetManifestSizeBytes;
+  private final FileFormat manifestFormat;
   private final Map<String, String> manifestWriterProps;
   private MetricsReporter reporter = LoggingMetricsReporter.instance();
   private volatile Long snapshotId = null;
@@ -142,6 +143,10 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     this.targetManifestSizeBytes =
         ops.current()
             .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, 
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+    this.manifestFormat =
+        ops.current().formatVersion() >= 
TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+            ? FileFormat.PARQUET
+            : FileFormat.AVRO;
     this.manifestWriterProps = manifestWriterProperties(ops.current());
     boolean snapshotIdInheritanceEnabled =
         ops.current()
@@ -603,7 +608,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   protected EncryptedOutputFile newManifestOutputFile() {
     String manifestFileLocation =
         ops.metadataFileLocation(
-            FileFormat.AVRO.addExtension(commitUUID + "-m" + 
manifestCount.getAndIncrement()));
+            manifestFormat.addExtension(commitUUID + "-m" + 
manifestCount.getAndIncrement()));
     return EncryptingFileIO.combine(ops.io(), ops.encryption())
         .newEncryptingOutputFile(manifestFileLocation);
   }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 43a67dd2be..c4a7bfc5c8 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
   static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
   static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
   static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
+  static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
   static final int INITIAL_SPEC_ID = 0;
   static final int INITIAL_SORT_ORDER_ID = 1;
   static final int INITIAL_SCHEMA_ID = 0;
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java 
b/core/src/main/java/org/apache/iceberg/V4Metadata.java
index 67478290aa..06fc75213d 100644
--- a/core/src/main/java/org/apache/iceberg/V4Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -23,6 +23,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.types.Types;
 
 class V4Metadata {
@@ -278,28 +279,38 @@ class V4Metadata {
   }
 
   static Types.StructType fileType(Types.StructType partitionType) {
-    return Types.StructType.of(
-        DataFile.CONTENT.asRequired(),
-        DataFile.FILE_PATH,
-        DataFile.FILE_FORMAT,
-        required(
-            DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, 
DataFile.PARTITION_DOC),
-        DataFile.RECORD_COUNT,
-        DataFile.FILE_SIZE,
-        DataFile.COLUMN_SIZES,
-        DataFile.VALUE_COUNTS,
-        DataFile.NULL_VALUE_COUNTS,
-        DataFile.NAN_VALUE_COUNTS,
-        DataFile.LOWER_BOUNDS,
-        DataFile.UPPER_BOUNDS,
-        DataFile.KEY_METADATA,
-        DataFile.SPLIT_OFFSETS,
-        DataFile.EQUALITY_IDS,
-        DataFile.SORT_ORDER_ID,
-        DataFile.FIRST_ROW_ID,
-        DataFile.REFERENCED_DATA_FILE,
-        DataFile.CONTENT_OFFSET,
-        DataFile.CONTENT_SIZE);
+    // Parquet cannot represent empty groups, so the partition field is 
omitted entirely from
+    // the file schema for unpartitioned tables. DataFileWrapper adjusts 
positions to match.
+    ImmutableList.Builder<Types.NestedField> fields =
+        ImmutableList.builderWithExpectedSize(partitionType.fields().isEmpty() 
? 18 : 19);
+    fields.add(DataFile.CONTENT.asRequired());
+    fields.add(DataFile.FILE_PATH);
+    fields.add(DataFile.FILE_FORMAT);
+    if (!partitionType.fields().isEmpty()) {
+      fields.add(
+          required(
+              DataFile.PARTITION_ID,
+              DataFile.PARTITION_NAME,
+              partitionType,
+              DataFile.PARTITION_DOC));
+    }
+    fields.add(DataFile.RECORD_COUNT);
+    fields.add(DataFile.FILE_SIZE);
+    fields.add(DataFile.COLUMN_SIZES);
+    fields.add(DataFile.VALUE_COUNTS);
+    fields.add(DataFile.NULL_VALUE_COUNTS);
+    fields.add(DataFile.NAN_VALUE_COUNTS);
+    fields.add(DataFile.LOWER_BOUNDS);
+    fields.add(DataFile.UPPER_BOUNDS);
+    fields.add(DataFile.KEY_METADATA);
+    fields.add(DataFile.SPLIT_OFFSETS);
+    fields.add(DataFile.EQUALITY_IDS);
+    fields.add(DataFile.SORT_ORDER_ID);
+    fields.add(DataFile.FIRST_ROW_ID);
+    fields.add(DataFile.REFERENCED_DATA_FILE);
+    fields.add(DataFile.CONTENT_OFFSET);
+    fields.add(DataFile.CONTENT_SIZE);
+    return Types.StructType.of(fields.build());
   }
 
   static class ManifestEntryWrapper<F extends ContentFile<F>>
@@ -309,10 +320,10 @@ class V4Metadata {
     private final DataFileWrapper<?> fileWrapper;
     private ManifestEntry<F> wrapped = null;
 
-    ManifestEntryWrapper(Long commitSnapshotId) {
-      this.size = entrySchema(Types.StructType.of()).columns().size();
+    ManifestEntryWrapper(Long commitSnapshotId, Types.StructType 
partitionType) {
+      this.size = entrySchema(partitionType).columns().size();
       this.commitSnapshotId = commitSnapshotId;
-      this.fileWrapper = new DataFileWrapper<>();
+      this.fileWrapper = new DataFileWrapper<>(partitionType);
     }
 
     public ManifestEntryWrapper<F> wrap(ManifestEntry<F> entry) {
@@ -423,11 +434,15 @@ class V4Metadata {
   /** Wrapper used to write DataFile or DeleteFile to v4 metadata. */
   static class DataFileWrapper<F extends ContentFile<F>> extends 
Delegates.DelegatingContentFile<F>
       implements ContentFile<F>, StructLike {
+    private static final int PARTITION_POSITION = 3;
+
     private final int size;
+    private final boolean hasPartition;
 
-    DataFileWrapper() {
+    DataFileWrapper(Types.StructType partitionType) {
       super(null);
-      this.size = fileType(Types.StructType.of()).fields().size();
+      this.hasPartition = !partitionType.fields().isEmpty();
+      this.size = fileType(partitionType).fields().size();
     }
 
     @SuppressWarnings("unchecked")
@@ -452,7 +467,10 @@ class V4Metadata {
     }
 
     private Object get(int pos) {
-      switch (pos) {
+      // when the partition field is omitted, positions at or after where it 
would appear
+      // shift down by 1, so adjust back to the canonical field ordering
+      int adjusted = hasPartition ? pos : (pos >= PARTITION_POSITION ? pos + 1 
: pos);
+      switch (adjusted) {
         case 0:
           return wrapped.content().id();
         case 1:
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java 
b/core/src/test/java/org/apache/iceberg/TestBase.java
index 27b8a49d04..0f649cabeb 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -263,7 +263,8 @@ public class TestBase {
             .listFiles(
                 (dir, name) ->
                     !name.startsWith("snap")
-                        && 
Files.getFileExtension(name).equalsIgnoreCase("avro")));
+                        && 
(Files.getFileExtension(name).equalsIgnoreCase("avro")
+                            || 
Files.getFileExtension(name).equalsIgnoreCase("parquet"))));
   }
 
   List<File> listManifestLists(File tableDirToList) {
@@ -297,12 +298,22 @@ public class TestBase {
     return TestTables.readMetadata("test");
   }
 
+  static FileFormat manifestFormat(int version) {
+    return version >= TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+        ? FileFormat.PARQUET
+        : FileFormat.AVRO;
+  }
+
+  FileFormat manifestFormat() {
+    return manifestFormat(formatVersion);
+  }
+
   ManifestFile writeManifest(DataFile... files) throws IOException {
     return writeManifest(null, files);
   }
 
   ManifestFile writeManifest(Long snapshotId, DataFile... files) throws 
IOException {
-    File manifestFile = temp.resolve("input.m0.avro").toFile();
+    File manifestFile = 
temp.resolve(manifestFormat().addExtension("input.m0")).toFile();
     assertThat(manifestFile).doesNotExist();
     OutputFile outputFile = 
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
@@ -324,7 +335,7 @@ public class TestBase {
   }
 
   ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries) 
throws IOException {
-    return writeManifest(snapshotId, "input.m0.avro", entries);
+    return writeManifest(snapshotId, 
manifestFormat().addExtension("input.m0"), entries);
   }
 
   @SuppressWarnings("unchecked")
@@ -360,8 +371,8 @@ public class TestBase {
       throws IOException {
     OutputFile manifestFile =
         org.apache.iceberg.Files.localOutput(
-            FileFormat.AVRO.addExtension(
-                temp.resolve("junit" + 
System.nanoTime()).toFile().toString()));
+            manifestFormat(newFormatVersion)
+                .addExtension(temp.resolve("junit" + 
System.nanoTime()).toFile().toString()));
     ManifestWriter<DeleteFile> writer =
         ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, 
manifestFile, snapshotId);
     try {
@@ -375,7 +386,7 @@ public class TestBase {
   }
 
   ManifestFile writeManifestWithName(String name, DataFile... files) throws 
IOException {
-    File manifestFile = temp.resolve(name + ".avro").toFile();
+    File manifestFile = 
temp.resolve(manifestFormat().addExtension(name)).toFile();
     assertThat(manifestFile).doesNotExist();
     OutputFile outputFile = 
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java 
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index 8f427525e2..bc28ecd880 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -509,14 +509,18 @@ public class TestFastAppend extends TestBase {
     assertThat(base.currentSnapshot()).isNull();
 
     ManifestFile manifestWithExistingFiles =
-        writeManifest("manifest-file-1.avro", manifestEntry(Status.EXISTING, 
null, FILE_A));
+        writeManifest(
+            manifestFormat().addExtension("manifest-file-1"),
+            manifestEntry(Status.EXISTING, null, FILE_A));
     assertThatThrownBy(
             () -> 
table.newFastAppend().appendManifest(manifestWithExistingFiles).commit())
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot append manifest with existing files");
 
     ManifestFile manifestWithDeletedFiles =
-        writeManifest("manifest-file-2.avro", manifestEntry(Status.DELETED, 
null, FILE_A));
+        writeManifest(
+            manifestFormat().addExtension("manifest-file-2"),
+            manifestEntry(Status.DELETED, null, FILE_A));
     assertThatThrownBy(
             () -> 
table.newFastAppend().appendManifest(manifestWithDeletedFiles).commit())
         .isInstanceOf(IllegalArgumentException.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java 
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index 6690a1483e..de2b7fd859 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -273,6 +273,10 @@ public class TestManifestReader extends TestBase {
   @SuppressWarnings("deprecation")
   @TestTemplate
   public void testDeprecatedReadWithoutSpecsById() throws IOException {
+    assumeThat(formatVersion)
+        .as("Deprecated read without specsById requires Avro metadata; V4 uses 
Parquet")
+        .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
     ManifestFile manifest = writeManifest(1000L, 
manifestEntry(Status.EXISTING, 1000L, FILE_A));
     try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, 
FILE_IO)) {
       ManifestEntry<DataFile> entry = 
Iterables.getOnlyElement(reader.entries());
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
index 00e66bdd7d..d710d949c5 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -41,7 +41,7 @@ public class TestManifestWriter extends TestBase {
   public void testManifestStats() throws IOException {
     ManifestFile manifest =
         writeManifest(
-            "manifest.avro",
+            manifestFormat().addExtension("manifest"),
             manifestEntry(Status.ADDED, null, newFile(10)),
             manifestEntry(Status.ADDED, null, newFile(20)),
             manifestEntry(Status.ADDED, null, newFile(5)),
@@ -67,7 +67,7 @@ public class TestManifestWriter extends TestBase {
   public void testManifestPartitionStats() throws IOException {
     ManifestFile manifest =
         writeManifest(
-            "manifest.avro",
+            manifestFormat().addExtension("manifest"),
             manifestEntry(Status.ADDED, null, newFile(10, 
TestHelpers.Row.of(1))),
             manifestEntry(Status.EXISTING, null, newFile(15, 
TestHelpers.Row.of(2))),
             manifestEntry(Status.DELETED, null, newFile(2, 
TestHelpers.Row.of(3))));
@@ -92,7 +92,8 @@ public class TestManifestWriter extends TestBase {
   @TestTemplate
   public void testWriteManifestWithSequenceNumber() throws IOException {
     assumeThat(formatVersion).isGreaterThan(1);
-    File manifestFile = temp.resolve("manifest" + System.nanoTime() + 
".avro").toFile();
+    File manifestFile =
+        temp.resolve(manifestFormat().addExtension("manifest" + 
System.nanoTime())).toFile();
     OutputFile outputFile = 
table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
     ManifestWriter<DataFile> writer =
         ManifestFiles.write(formatVersion, table.spec(), outputFile, 1L);
@@ -119,7 +120,7 @@ public class TestManifestWriter extends TestBase {
 
     ManifestFile manifest =
         writeManifest(
-            "manifest.avro",
+            manifestFormat().addExtension("manifest"),
             manifestEntry(Status.ADDED, null, dataSequenceNumber, null, file1),
             manifestEntry(Status.ADDED, null, dataSequenceNumber, null, 
file2));
 
@@ -161,7 +162,7 @@ public class TestManifestWriter extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest.avro",
+            manifestFormat().addExtension("manifest"),
             manifestEntry(Status.EXISTING, appendSnapshotId, 
appendSequenceNumber, null, file1),
             manifestEntry(Status.EXISTING, appendSnapshotId, 
appendSequenceNumber, null, file2));
 
diff --git 
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 5e83827f0c..966b573bd9 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -115,6 +115,8 @@ public class TestManifestWriterVersions {
           null,
           null);
 
+  static final List<FileFormat> V4_FORMATS = ImmutableList.of(FileFormat.AVRO, 
FileFormat.PARQUET);
+
   @TempDir private Path temp;
 
   @Test
@@ -344,6 +346,100 @@ public class TestManifestWriterVersions {
     assertThat(readAvroCodec(manifestFile)).isEqualTo("snappy");
   }
 
+  @ParameterizedTest
+  @FieldSource("V4_FORMATS")
+  public void testV4WritePartitioned(FileFormat fileFormat) throws IOException 
{
+    ManifestFile manifest = writeManifest(4, fileFormat, SPEC, DATA_FILE);
+    checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+    checkEntry(
+        readManifest(manifest),
+        ManifestWriter.UNASSIGNED_SEQ,
+        ManifestWriter.UNASSIGNED_SEQ,
+        FileContent.DATA,
+        FIRST_ROW_ID);
+  }
+
+  @ParameterizedTest
+  @FieldSource("V4_FORMATS")
+  public void testV4WriteUnpartitioned(FileFormat fileFormat) throws 
IOException {
+    DataFile unpartitionedFile =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(PATH)
+            .withFormat(FORMAT)
+            .withFileSizeInBytes(150972L)
+            .withMetrics(METRICS)
+            .withSplitOffsets(OFFSETS)
+            .withSortOrderId(SORT_ORDER_ID)
+            .withFirstRowId(FIRST_ROW_ID)
+            .build();
+
+    ManifestFile manifest =
+        writeManifest(4, fileFormat, PartitionSpec.unpartitioned(), 
unpartitionedFile);
+    checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+
+    Map<Integer, PartitionSpec> unpartitionedSpecs =
+        ImmutableMap.of(PartitionSpec.unpartitioned().specId(), 
PartitionSpec.unpartitioned());
+    try (CloseableIterable<ManifestEntry<DataFile>> reader =
+        ManifestFiles.read(manifest, io, unpartitionedSpecs).entries()) {
+      ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader);
+      assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED);
+      assertThat(entry.file().location()).isEqualTo(PATH);
+      assertThat(entry.file().recordCount()).isEqualTo(METRICS.recordCount());
+      assertThat(entry.file().firstRowId()).isEqualTo(FIRST_ROW_ID);
+    }
+  }
+
+  @ParameterizedTest
+  @FieldSource("V4_FORMATS")
+  public void testV4WriteDeletePartitioned(FileFormat fileFormat) throws 
IOException {
+    ManifestFile manifest = writeDeleteManifest(4, fileFormat, SPEC);
+    checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+    assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES);
+    checkEntry(
+        readDeleteManifest(manifest),
+        ManifestWriter.UNASSIGNED_SEQ,
+        ManifestWriter.UNASSIGNED_SEQ,
+        FileContent.EQUALITY_DELETES);
+  }
+
+  @ParameterizedTest
+  @FieldSource("V4_FORMATS")
+  public void testV4WriteDeleteUnpartitioned(FileFormat fileFormat) throws 
IOException {
+    DeleteFile unpartitionedDelete =
+        new GenericDeleteFile(
+            0,
+            FileContent.EQUALITY_DELETES,
+            PATH,
+            FORMAT,
+            new PartitionData(PartitionSpec.unpartitioned().partitionType()),
+            22905L,
+            METRICS,
+            EQUALITY_ID_ARR,
+            SORT_ORDER_ID,
+            null,
+            null,
+            null,
+            null,
+            null);
+
+    ManifestFile manifest =
+        writeDeleteManifest(4, fileFormat, PartitionSpec.unpartitioned(), 
unpartitionedDelete);
+    checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+    assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES);
+
+    Map<Integer, PartitionSpec> unpartitionedSpecs =
+        ImmutableMap.of(PartitionSpec.unpartitioned().specId(), 
PartitionSpec.unpartitioned());
+    try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+        ManifestFiles.readDeleteManifest(manifest, io, 
unpartitionedSpecs).entries()) {
+      ManifestEntry<DeleteFile> entry = Iterables.getOnlyElement(reader);
+      assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED);
+      
assertThat(entry.file().content()).isEqualTo(FileContent.EQUALITY_DELETES);
+      assertThat(entry.file().location()).isEqualTo(PATH);
+      assertThat(entry.file().recordCount()).isEqualTo(METRICS.recordCount());
+      assertThat(entry.file().equalityFieldIds()).isEqualTo(EQUALITY_IDS);
+    }
+  }
+
   void checkEntry(
       ManifestEntry<?> entry,
       Long expectedDataSequenceNumber,
@@ -466,7 +562,7 @@ public class TestManifestWriterVersions {
 
   private ManifestFile rewriteManifest(ManifestFile manifest, int 
formatVersion)
       throws IOException {
-    String filename = FileFormat.AVRO.addExtension("rewrite-manifest");
+    String filename = 
TestBase.manifestFormat(formatVersion).addExtension("rewrite-manifest");
     EncryptedOutputFile manifestFile = 
encryptionManager().encrypt(io.newOutputFile(filename));
     ManifestWriter<DataFile> writer =
         ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
@@ -483,10 +579,16 @@ public class TestManifestWriterVersions {
   }
 
   private ManifestFile writeManifest(int formatVersion, DataFile... files) 
throws IOException {
-    String filename = FileFormat.AVRO.addExtension("manifest");
+    return writeManifest(formatVersion, 
TestBase.manifestFormat(formatVersion), SPEC, files);
+  }
+
+  private ManifestFile writeManifest(
+      int formatVersion, FileFormat fileFormat, PartitionSpec spec, 
DataFile... files)
+      throws IOException {
+    String filename = fileFormat.addExtension("manifest");
     EncryptedOutputFile manifestFile = 
encryptionManager().encrypt(io.newOutputFile(filename));
     ManifestWriter<DataFile> writer =
-        ManifestFiles.newWriter(formatVersion, SPEC, manifestFile, 
SNAPSHOT_ID, FIRST_ROW_ID);
+        ManifestFiles.newWriter(formatVersion, spec, manifestFile, 
SNAPSHOT_ID, FIRST_ROW_ID);
     try {
       for (DataFile file : files) {
         writer.add(file);
@@ -512,12 +614,23 @@ public class TestManifestWriterVersions {
   }
 
   private ManifestFile writeDeleteManifest(int formatVersion) throws 
IOException {
-    String filename = FileFormat.AVRO.addExtension("manifest");
+    return writeDeleteManifest(formatVersion, 
TestBase.manifestFormat(formatVersion), SPEC);
+  }
+
+  private ManifestFile writeDeleteManifest(
+      int formatVersion, FileFormat fileFormat, PartitionSpec spec) throws 
IOException {
+    return writeDeleteManifest(formatVersion, fileFormat, spec, DELETE_FILE);
+  }
+
+  private ManifestFile writeDeleteManifest(
+      int formatVersion, FileFormat fileFormat, PartitionSpec spec, DeleteFile 
deleteFile)
+      throws IOException {
+    String filename = fileFormat.addExtension("manifest");
     EncryptedOutputFile manifestFile = 
encryptionManager().encrypt(io.newOutputFile(filename));
     ManifestWriter<DeleteFile> writer =
-        ManifestFiles.writeDeleteManifest(formatVersion, SPEC, manifestFile, 
SNAPSHOT_ID);
+        ManifestFiles.writeDeleteManifest(formatVersion, spec, manifestFile, 
SNAPSHOT_ID);
     try {
-      writer.add(DELETE_FILE);
+      writer.add(deleteFile);
     } finally {
       writer.close();
     }
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 3947f16fe1..b7700d7ce7 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -629,7 +629,8 @@ public class TestMergeAppend extends TestBase {
                 .newAppend()
                 .appendManifest(
                     writeManifest(
-                        "input-m0.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))),
+                        manifestFormat().addExtension("input-m0"),
+                        manifestEntry(ManifestEntry.Status.ADDED, null, 
FILE_C))),
             branch);
 
     base = readMetadata();
@@ -671,7 +672,8 @@ public class TestMergeAppend extends TestBase {
                 .newAppend()
                 .appendManifest(
                     writeManifest(
-                        "input-m1.avro", 
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))),
+                        manifestFormat().addExtension("input-m1"),
+                        manifestEntry(ManifestEntry.Status.ADDED, null, 
FILE_D))),
             branch);
 
     base = readMetadata();
@@ -1274,7 +1276,7 @@ public class TestMergeAppend extends TestBase {
 
     table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"1").commit();
 
-    ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro", 
FILE_A, FILE_B);
+    ManifestFile manifest1 = writeManifestWithName("manifest-file-1", FILE_A, 
FILE_B);
     Snapshot snap1 = commit(table, 
table.newAppend().appendManifest(manifest1), branch);
 
     long commitId1 = snap1.snapshotId();
@@ -1290,7 +1292,7 @@ public class TestMergeAppend extends TestBase {
         statuses(Status.ADDED, Status.ADDED));
     assertThat(new File(manifest1.path())).exists();
 
-    ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro", 
FILE_C, FILE_D);
+    ManifestFile manifest2 = writeManifestWithName("manifest-file-2", FILE_C, 
FILE_D);
     Snapshot snap2 = commit(table, 
table.newAppend().appendManifest(manifest2), branch);
 
     long commitId2 = snap2.snapshotId();
@@ -1347,7 +1349,9 @@ public class TestMergeAppend extends TestBase {
     assertThat(base.currentSnapshot()).isNull();
 
     ManifestFile manifestWithExistingFiles =
-        writeManifest("manifest-file-1.avro", manifestEntry(Status.EXISTING, 
null, FILE_A));
+        writeManifest(
+            manifestFormat().addExtension("manifest-file-1"),
+            manifestEntry(Status.EXISTING, null, FILE_A));
     assertThatThrownBy(
             () ->
                 commit(table, 
table.newAppend().appendManifest(manifestWithExistingFiles), branch))
@@ -1356,7 +1360,9 @@ public class TestMergeAppend extends TestBase {
     assertThat(readMetadata().lastSequenceNumber()).isEqualTo(0);
 
     ManifestFile manifestWithDeletedFiles =
-        writeManifest("manifest-file-2.avro", manifestEntry(Status.DELETED, 
null, FILE_A));
+        writeManifest(
+            manifestFormat().addExtension("manifest-file-2"),
+            manifestEntry(Status.DELETED, null, FILE_A));
     assertThatThrownBy(
             () -> commit(table, 
table.newAppend().appendManifest(manifestWithDeletedFiles), branch))
         .isInstanceOf(IllegalArgumentException.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 19ee156c9e..dab323743b 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -55,7 +55,8 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED, 
null, FILE_A));
+            manifestFormat().addExtension("manifest-file-1"),
+            manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
 
     table.newFastAppend().appendManifest(newManifest).commit();
     long appendId = table.currentSnapshot().snapshotId();
@@ -79,7 +80,8 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED, 
null, FILE_A));
+            manifestFormat().addExtension("manifest-file-1"),
+            manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
 
     table.newFastAppend().appendManifest(newManifest).commit();
 
@@ -115,7 +117,8 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED, 
null, FILE_A));
+            manifestFormat().addExtension("manifest-file-1"),
+            manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
 
     table.newFastAppend().appendManifest(newManifest).commit();
     long manifestAppendId = table.currentSnapshot().snapshotId();
@@ -428,11 +431,11 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile firstNewManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
     ManifestFile secondNewManifest =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_B));
 
     RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -492,11 +495,11 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile firstNewManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
     ManifestFile secondNewManifest =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_B));
 
     RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -679,11 +682,11 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile firstNewManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
     ManifestFile secondNewManifest =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_B));
 
     RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -741,11 +744,11 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile firstNewManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
     ManifestFile secondNewManifest =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_B));
 
     RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -796,11 +799,11 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile firstNewManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
     ManifestFile secondNewManifest =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_B));
 
     RewriteManifests rewriteManifests = table.rewriteManifests();
@@ -841,7 +844,7 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A));
 
     table
@@ -904,7 +907,8 @@ public class TestRewriteManifests extends TestBase {
         manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A);
     // update the entry's sequence number or else it will be rejected by the 
writer
     entry.setDataSequenceNumber(firstSnapshot.sequenceNumber());
-    ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry);
+    ManifestFile newManifest =
+        writeManifest(manifestFormat().addExtension("manifest-file-1"), entry);
 
     RewriteManifests rewriteManifests =
         table
@@ -954,7 +958,8 @@ public class TestRewriteManifests extends TestBase {
     // update the entry's sequence number or else it will be rejected by the 
writer
     appendEntry.setDataSequenceNumber(snapshot.sequenceNumber());
 
-    ManifestFile invalidAddedFileManifest = 
writeManifest("manifest-file-2.avro", appendEntry);
+    ManifestFile invalidAddedFileManifest =
+        writeManifest(manifestFormat().addExtension("manifest-file-2"), 
appendEntry);
 
     assertThatThrownBy(
             () ->
@@ -971,7 +976,8 @@ public class TestRewriteManifests extends TestBase {
     // update the entry's sequence number or else it will be rejected by the 
writer
     deleteEntry.setDataSequenceNumber(snapshot.sequenceNumber());
 
-    ManifestFile invalidDeletedFileManifest = 
writeManifest("manifest-file-3.avro", deleteEntry);
+    ManifestFile invalidDeletedFileManifest =
+        writeManifest(manifestFormat().addExtension("manifest-file-3"), 
deleteEntry);
 
     assertThatThrownBy(
             () ->
@@ -1009,7 +1015,7 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file.avro",
+            manifestFormat().addExtension("manifest-file"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A),
             manifestEntry(ManifestEntry.Status.EXISTING, 
secondSnapshot.snapshotId(), FILE_B));
 
@@ -1051,7 +1057,7 @@ public class TestRewriteManifests extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file.avro",
+            manifestFormat().addExtension("manifest-file"),
             manifestEntry(ManifestEntry.Status.EXISTING, 
firstSnapshot.snapshotId(), FILE_A),
             manifestEntry(ManifestEntry.Status.EXISTING, 
secondSnapshot.snapshotId(), FILE_B));
 
@@ -1176,7 +1182,7 @@ public class TestRewriteManifests extends TestBase {
         Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
     ManifestFile newDeleteManifest1 =
         writeManifest(
-            "delete-manifest-file-1.avro",
+            manifestFormat().addExtension("delete-manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1185,7 +1191,7 @@ public class TestRewriteManifests extends TestBase {
                 fileADeletes()));
     ManifestFile newDeleteManifest2 =
         writeManifest(
-            "delete-manifest-file-2.avro",
+            manifestFormat().addExtension("delete-manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1262,7 +1268,7 @@ public class TestRewriteManifests extends TestBase {
         Iterables.getOnlyElement(deleteSnapshot.dataManifests(table.io()));
     ManifestFile newDataManifest1 =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 appendSnapshotId,
@@ -1271,7 +1277,7 @@ public class TestRewriteManifests extends TestBase {
                 FILE_A));
     ManifestFile newDataManifest2 =
         writeManifest(
-            "manifest-file-2.avro",
+            manifestFormat().addExtension("manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 appendSnapshotId,
@@ -1284,7 +1290,7 @@ public class TestRewriteManifests extends TestBase {
         Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
     ManifestFile newDeleteManifest1 =
         writeManifest(
-            "delete-manifest-file-1.avro",
+            manifestFormat().addExtension("delete-manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1293,7 +1299,7 @@ public class TestRewriteManifests extends TestBase {
                 fileADeletes()));
     ManifestFile newDeleteManifest2 =
         writeManifest(
-            "delete-manifest-file-2.avro",
+            manifestFormat().addExtension("delete-manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1376,7 +1382,7 @@ public class TestRewriteManifests extends TestBase {
         Iterables.getOnlyElement(deleteSnapshot.deleteManifests(table.io()));
     ManifestFile newDeleteManifest1 =
         writeManifest(
-            "delete-manifest-file-1.avro",
+            manifestFormat().addExtension("delete-manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1385,7 +1391,7 @@ public class TestRewriteManifests extends TestBase {
                 fileADeletes()));
     ManifestFile newDeleteManifest2 =
         writeManifest(
-            "delete-manifest-file-2.avro",
+            manifestFormat().addExtension("delete-manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1486,7 +1492,7 @@ public class TestRewriteManifests extends TestBase {
     ManifestFile originalDeleteManifest = 
deleteSnapshot1.deleteManifests(table.io()).get(0);
     ManifestFile newDeleteManifest1 =
         writeManifest(
-            "delete-manifest-file-1.avro",
+            manifestFormat().addExtension("delete-manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId1,
@@ -1495,7 +1501,7 @@ public class TestRewriteManifests extends TestBase {
                 fileADeletes()));
     ManifestFile newDeleteManifest2 =
         writeManifest(
-            "delete-manifest-file-2.avro",
+            manifestFormat().addExtension("delete-manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId1,
@@ -1581,7 +1587,7 @@ public class TestRewriteManifests extends TestBase {
     ManifestFile originalDeleteManifest = 
deleteSnapshot.deleteManifests(table.io()).get(0);
     ManifestFile newDeleteManifest1 =
         writeManifest(
-            "delete-manifest-file-1.avro",
+            manifestFormat().addExtension("delete-manifest-file-1"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1590,7 +1596,7 @@ public class TestRewriteManifests extends TestBase {
                 fileADeletes()));
     ManifestFile newDeleteManifest2 =
         writeManifest(
-            "delete-manifest-file-2.avro",
+            manifestFormat().addExtension("delete-manifest-file-2"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId,
@@ -1645,7 +1651,7 @@ public class TestRewriteManifests extends TestBase {
     // combine the original delete manifests into 1 new delete manifest
     ManifestFile newDeleteManifest =
         writeManifest(
-            "delete-manifest-file.avro",
+            manifestFormat().addExtension("delete-manifest-file"),
             manifestEntry(
                 ManifestEntry.Status.EXISTING,
                 deleteSnapshotId1,
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index dd97738759..c6092f0238 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
 import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -227,7 +228,11 @@ public class TestSnapshotProducer extends TestBase {
   }
 
   @TestTemplate
-  public void testDefaultManifestCompression() throws IOException {
+  public void testDefaultAvroManifestCompression() throws IOException {
+    assumeThat(formatVersion)
+        .as("V4 uses Parquet manifests by default; Avro codec checks do not 
apply")
+        .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
     table.newFastAppend().appendFile(FILE_A).commit();
 
     ManifestFile manifest = 
table.currentSnapshot().dataManifests(table.io()).get(0);
@@ -235,7 +240,11 @@ public class TestSnapshotProducer extends TestBase {
   }
 
   @TestTemplate
-  public void testManifestCompressionFromTableProperty() throws IOException {
+  public void testAvroManifestCompressionFromTableProperty() throws 
IOException {
+    assumeThat(formatVersion)
+        .as("V4 uses Parquet manifests by default; Avro codec checks do not 
apply")
+        .isLessThan(TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
+
     table.updateProperties().set(TableProperties.MANIFEST_COMPRESSION, 
"snappy").commit();
 
     table.newFastAppend().appendFile(FILE_A).commit();
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java 
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 9ec8c47840..fe47ac6256 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -666,7 +666,7 @@ public class TestTransaction extends TestBase {
 
     ManifestFile newManifest =
         writeManifest(
-            "manifest-file-1.avro",
+            manifestFormat().addExtension("manifest-file-1"),
             manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, 
FILE_A),
             manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, 
FILE_B));
 
@@ -811,7 +811,7 @@ public class TestTransaction extends TestBase {
             .rewriteManifests()
             .addManifest(
                 writeManifest(
-                    "new_delete_manifest.avro",
+                    manifestFormat().addExtension("new_delete_manifest"),
                     // Specify data sequence number so that the delete files 
don't get aged out
                     // first
                     manifestEntry(
@@ -880,7 +880,7 @@ public class TestTransaction extends TestBase {
             .rewriteManifests()
             .addManifest(
                 writeManifest(
-                    "new_manifest.avro",
+                    manifestFormat().addExtension("new_manifest"),
                     manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A),
                     manifestEntry(Status.EXISTING, first.snapshotId(), 
FILE_A2),
                     manifestEntry(Status.EXISTING, second.snapshotId(), 
FILE_B)))
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java 
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index ff0af5c563..ce22c8089b 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -130,7 +130,7 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
     return Stream.of(new File(location).listFiles())
         .filter(file -> !file.isDirectory())
         .map(File::getName)
-        .filter(fileName -> fileName.endsWith(".avro"))
+        .filter(fileName -> !fileName.startsWith(".") && 
!fileName.endsWith("metadata.json"))
         .collect(Collectors.toList());
   }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
index 8d24160320..0c7e032bde 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestManifestFileUtil.java
@@ -20,11 +20,14 @@ package org.apache.iceberg.util;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
@@ -35,24 +38,32 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+class TestManifestFileUtil {
+  private static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
 
-public class TestManifestFileUtil {
   private static final Schema SCHEMA =
       new Schema(
           optional(1, "id", Types.IntegerType.get()),
           optional(2, "unknown", Types.UnknownType.get()),
           optional(3, "floats", Types.FloatType.get()));
 
+  private final AtomicInteger manifestCounter = new AtomicInteger(0);
+
   @TempDir private Path temp;
 
-  @Test
-  public void canContainWithUnknownTypeOnly() throws IOException {
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  void canContainWithUnknownTypeOnly(int formatVersion) throws IOException {
+    // Parquet cannot represent the empty struct produced by an 
UnknownType-only partition
+    assumeThat(formatVersion).isLessThan(MIN_FORMAT_VERSION_PARQUET_MANIFESTS);
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("unknown").build();
     PartitionData partition = new PartitionData(spec.partitionType());
     partition.set(0, "someValue");
-    ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+    ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec, 
partition);
 
     assertThat(
             ManifestFileUtil.canContainAny(
@@ -62,12 +73,13 @@ public class TestManifestFileUtil {
         .isTrue();
   }
 
-  @Test
-  public void canContainWithNaNValueOnly() throws IOException {
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  void canContainWithNaNValueOnly(int formatVersion) throws IOException {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
     PartitionData partition = new PartitionData(spec.partitionType());
     partition.set(0, Float.NaN);
-    ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+    ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec, 
partition);
 
     assertThat(
             ManifestFileUtil.canContainAny(
@@ -77,12 +89,13 @@ public class TestManifestFileUtil {
         .isTrue();
   }
 
-  @Test
-  public void canContainWithNullValueOnly() throws IOException {
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  void canContainWithNullValueOnly(int formatVersion) throws IOException {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("floats").build();
     PartitionData partition = new PartitionData(spec.partitionType());
     partition.set(0, null);
-    ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+    ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec, 
partition);
 
     assertThat(
             ManifestFileUtil.canContainAny(
@@ -92,14 +105,15 @@ public class TestManifestFileUtil {
         .isTrue();
   }
 
-  @Test
-  public void canContainWithUnknownType() throws IOException {
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  void canContainWithUnknownType(int formatVersion) throws IOException {
     PartitionSpec spec =
         
PartitionSpec.builderFor(SCHEMA).identity("floats").identity("unknown").build();
     PartitionData partition = new PartitionData(spec.partitionType());
     partition.set(0, 1.0f);
     partition.set(1, "someValue");
-    ManifestFile manifestFile = writeManifestWithDataFile(spec, partition);
+    ManifestFile manifestFile = writeManifestWithDataFile(formatVersion, spec, 
partition);
 
     assertThat(
             ManifestFileUtil.canContainAny(
@@ -109,9 +123,16 @@ public class TestManifestFileUtil {
         .isTrue();
   }
 
-  private ManifestFile writeManifestWithDataFile(PartitionSpec spec, 
PartitionData partition)
-      throws IOException {
-    ManifestWriter<DataFile> writer = ManifestFiles.write(spec, 
Files.localOutput(temp.toFile()));
+  private ManifestFile writeManifestWithDataFile(
+      int formatVersion, PartitionSpec spec, PartitionData partition) throws 
IOException {
+    FileFormat format =
+        formatVersion >= MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+            ? FileFormat.PARQUET
+            : FileFormat.AVRO;
+    String filename = format.addExtension("manifest-" + 
manifestCounter.getAndIncrement());
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(
+            formatVersion, spec, 
Files.localOutput(temp.resolve(filename).toFile()), null);
     try (writer) {
       writer.add(
           DataFiles.builder(spec)
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index 8aa9aa4779..63d6d80d58 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -25,7 +25,11 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -828,6 +832,16 @@ public class ParquetValueReaders {
     protected abstract T buildList(I list);
   }
 
+  // Only recycle known growable JDK collections as scratch buffers. Reuse may 
be an unmodifiable
+  // view, Guava immutable type, List.of / Map.of, etc.; those are not these 
concrete classes.
+  private static boolean canReuseListAsReadBuffer(List<?> list) {
+    return list instanceof ArrayList || list instanceof LinkedList;
+  }
+
+  private static boolean canReuseMapAsReadBuffer(Map<?, ?> map) {
+    return map instanceof LinkedHashMap || map instanceof HashMap;
+  }
+
   public static class ListReader<E> extends RepeatedReader<List<E>, List<E>, 
E> {
     private List<E> lastList = null;
     private Iterator<E> elements = null;
@@ -847,7 +861,7 @@ public class ParquetValueReaders {
       }
 
       if (reuse != null) {
-        this.lastList = reuse;
+        this.lastList = canReuseListAsReadBuffer(reuse) ? reuse : null;
         this.elements = reuse.iterator();
       } else {
         this.lastList = null;
@@ -973,7 +987,7 @@ public class ParquetValueReaders {
       }
 
       if (reuse != null) {
-        this.lastMap = reuse;
+        this.lastMap = canReuseMapAsReadBuffer(reuse) ? reuse : null;
         this.pairs = reuse.entrySet().iterator();
       } else {
         this.lastMap = null;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends 
TestBase {
             .as(Encoders.STRING())
             .collectAsList();
     Predicate<String> isManifest =
-        f ->
-            (f.contains("optimized-m-") && f.endsWith(".avro"))
-                || f.endsWith("-m0.avro")
-                || f.endsWith("-m1.avro");
+        f -> f.contains("optimized-m-") || f.contains("-m0.") || 
f.contains("-m1.");
     Predicate<String> isManifestList = f -> f.contains("snap-") && 
f.endsWith(".avro");
     Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
 
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends 
TestBase {
             .as(Encoders.STRING())
             .collectAsList();
     Predicate<String> isManifest =
-        f ->
-            (f.contains("optimized-m-") && f.endsWith(".avro"))
-                || f.endsWith("-m0.avro")
-                || f.endsWith("-m1.avro");
+        f -> f.contains("optimized-m-") || f.contains("-m0.") || 
f.contains("-m1.");
     Predicate<String> isManifestList = f -> f.contains("snap-") && 
f.endsWith(".avro");
     Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index dae721b1d7..c5db04762f 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -1469,10 +1469,7 @@ public class TestRewriteTablePathsAction extends 
TestBase {
             .as(Encoders.STRING())
             .collectAsList();
     Predicate<String> isManifest =
-        f ->
-            (f.contains("optimized-m-") && f.endsWith(".avro"))
-                || f.endsWith("-m0.avro")
-                || f.endsWith("-m1.avro");
+        f -> f.contains("optimized-m-") || f.contains("-m0.") || 
f.contains("-m1.");
     Predicate<String> isManifestList = f -> f.contains("snap-") && 
f.endsWith(".avro");
     Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
 

Reply via email to