This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 235ab0f4fd Parquet, Data: Implementation of ParquetFormatModel (#15253)
235ab0f4fd is described below

commit 235ab0f4fd34f5e71642030577a7ea996e2c4489
Author: pvary <[email protected]>
AuthorDate: Sun Feb 15 10:53:20 2026 +0100

    Parquet, Data: Implementation of ParquetFormatModel (#15253)
---
 .../apache/iceberg/data/GenericFormatModels.java   |  14 +
 .../iceberg/data/TestGenericFormatModels.java      |   3 +-
 .../java/org/apache/iceberg/parquet/Parquet.java   |  47 ++-
 .../apache/iceberg/parquet/ParquetFormatModel.java | 317 +++++++++++++++++++++
 4 files changed, 373 insertions(+), 8 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java 
b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
index e2e13861ca..6fde8bbeba 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
@@ -21,7 +21,10 @@ package org.apache.iceberg.data;
 import org.apache.iceberg.avro.AvroFormatModel;
 import org.apache.iceberg.data.avro.DataWriter;
 import org.apache.iceberg.data.avro.PlannedDataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.parquet.ParquetFormatModel;
 
 public class GenericFormatModels {
   public static void register() {
@@ -34,6 +37,17 @@ public class GenericFormatModels {
                 PlannedDataReader.create(icebergSchema, idToConstant)));
 
     FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            Record.class,
+            Void.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                GenericParquetWriter.create(icebergSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                GenericParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
   }
 
   private GenericFormatModels() {}
diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java 
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
index ca3dda30ab..ab5968da8b 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
@@ -54,7 +54,8 @@ public class TestGenericFormatModels {
   private static final List<Record> TEST_RECORDS =
       RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
 
-  private static final FileFormat[] FILE_FORMATS = new FileFormat[] 
{FileFormat.AVRO};
+  private static final FileFormat[] FILE_FORMATS =
+      new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET};
 
   @TempDir protected Path temp;
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index ae049d0875..2387d52edf 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -302,8 +302,7 @@ public class Parquet {
     }
 
     // supposed to always be a private method used strictly by data and delete 
write builders
-    private WriteBuilder createContextFunc(
-        Function<Map<String, String>, Context> newCreateContextFunc) {
+    WriteBuilder createContextFunc(Function<Map<String, String>, Context> 
newCreateContextFunc) {
       this.createContextFunc = newCreateContextFunc;
       return this;
     }
@@ -498,7 +497,7 @@ public class Parquet {
       }
     }
 
-    private static class Context {
+    static class Context {
       private final int rowGroupSize;
       private final int pageSize;
       private final int pageRowLimit;
@@ -1176,6 +1175,7 @@ public class Parquet {
     private Expression filter = null;
     private ReadSupport<?> readSupport = null;
     private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = 
null;
+    private BiFunction<Schema, MessageType, VectorizedReader<?>> 
batchedReaderFuncWithSchema = null;
     private ReaderFunction readerFunction = null;
     private boolean filterRecords = true;
     private boolean caseSensitive = true;
@@ -1298,6 +1298,9 @@ public class Parquet {
       Preconditions.checkArgument(
           this.batchedReaderFunc == null,
           "Cannot set reader function: batched reader function already set");
+      Preconditions.checkArgument(
+          this.batchedReaderFuncWithSchema == null,
+          "Cannot set reader function: batched reader function with schema 
already set");
       Preconditions.checkArgument(
           this.readerFunction == null, "Cannot set reader function: reader 
function already set");
       this.readerFunction = new UnaryReaderFunction(newReaderFunction);
@@ -1309,6 +1312,9 @@ public class Parquet {
       Preconditions.checkArgument(
           this.batchedReaderFunc == null,
           "Cannot set reader function: batched reader function already set");
+      Preconditions.checkArgument(
+          this.batchedReaderFuncWithSchema == null,
+          "Cannot set reader function: batched reader function with schema 
already set");
       Preconditions.checkArgument(
           this.readerFunction == null, "Cannot set reader function: reader 
function already set");
       this.readerFunction = new BinaryReaderFunction(newReaderFunction);
@@ -1319,6 +1325,9 @@ public class Parquet {
       Preconditions.checkArgument(
           this.batchedReaderFunc == null,
           "Cannot set batched reader function: batched reader function already 
set");
+      Preconditions.checkArgument(
+          this.batchedReaderFuncWithSchema == null,
+          "Cannot set reader function: batched reader function with schema 
already set");
       Preconditions.checkArgument(
           this.readerFunction == null,
           "Cannot set batched reader function: ReaderFunction already set");
@@ -1326,10 +1335,28 @@ public class Parquet {
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(
+        BiFunction<Schema, MessageType, VectorizedReader<?>> func) {
+      Preconditions.checkArgument(
+          this.batchedReaderFunc == null,
+          "Cannot set batched reader function: batched reader function already 
set");
+      Preconditions.checkArgument(
+          this.batchedReaderFuncWithSchema == null,
+          "Cannot set reader function: batched reader function with schema 
already set");
+      Preconditions.checkArgument(
+          this.readerFunction == null,
+          "Cannot set batched reader function: ReaderFunction already set");
+      this.batchedReaderFuncWithSchema = func;
+      return this;
+    }
+
     public ReadBuilder createReaderFunc(ReaderFunction reader) {
       Preconditions.checkArgument(
           this.batchedReaderFunc == null,
           "Cannot set reader function: batched reader function already set");
+      Preconditions.checkArgument(
+          this.batchedReaderFuncWithSchema == null,
+          "Cannot set reader function: batched reader function with schema 
already set");
       Preconditions.checkArgument(
           this.readerFunction == null, "Cannot set reader function: reader 
function already set");
       this.readerFunction = reader;
@@ -1389,7 +1416,7 @@ public class Parquet {
     }
 
     @Override
-    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", 
"checkstyle:MethodLength"})
     public <D> CloseableIterable<D> build() {
       FileDecryptionProperties fileDecryptionProperties = null;
       if (fileEncryptionKey != null) {
@@ -1404,7 +1431,9 @@ public class Parquet {
         Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with 
null encryption key");
       }
 
-      if (batchedReaderFunc != null || readerFunction != null) {
+      if (batchedReaderFunc != null
+          || batchedReaderFuncWithSchema != null
+          || readerFunction != null) {
         ParquetReadOptions.Builder optionsBuilder;
         if (file instanceof HadoopInputFile) {
           // remove read properties already set that may conflict with this 
read
@@ -1441,12 +1470,16 @@ public class Parquet {
           mapping = NameMapping.empty();
         }
 
-        if (batchedReaderFunc != null) {
+        Function<MessageType, VectorizedReader<?>> batchedFunc =
+            batchedReaderFuncWithSchema != null
+                ? messageType -> batchedReaderFuncWithSchema.apply(schema, 
messageType)
+                : batchedReaderFunc;
+        if (batchedFunc != null) {
           return new VectorizedParquetReader<>(
               file,
               schema,
               options,
-              batchedReaderFunc,
+              batchedFunc,
               mapping,
               filter,
               reuseContainers,
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
new file mode 100644
index 0000000000..90d6e3ef41
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.BaseFormatModel;
+import org.apache.iceberg.formats.ModelWriteBuilder;
+import org.apache.iceberg.formats.ReadBuilder;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetFormatModel<D, S, R>
+    extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
+  public static final String WRITER_VERSION_KEY = "parquet.writer.version";
+  private final boolean isBatchReader;
+
+  public static <D> ParquetFormatModel<PositionDelete<D>, Void, Object> 
forPositionDeletes() {
+    return new ParquetFormatModel<>(PositionDelete.deleteClass(), Void.class, 
null, null, false);
+  }
+
+  public static <D, S> ParquetFormatModel<D, S, ParquetValueReader<?>> create(
+      Class<D> type,
+      Class<S> schemaType,
+      WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
+      ReaderFunction<ParquetValueReader<?>, S, MessageType> readerFunction) {
+    return new ParquetFormatModel<>(type, schemaType, writerFunction, 
readerFunction, false);
+  }
+
+  public static <D, S> ParquetFormatModel<D, S, VectorizedReader<?>> create(
+      Class<? extends D> type,
+      Class<S> schemaType,
+      ReaderFunction<VectorizedReader<?>, S, MessageType> batchReaderFunction) 
{
+    return new ParquetFormatModel<>(type, schemaType, null, 
batchReaderFunction, true);
+  }
+
+  private ParquetFormatModel(
+      Class<? extends D> type,
+      Class<S> schemaType,
+      WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
+      ReaderFunction<R, S, MessageType> readerFunction,
+      boolean isBatchReader) {
+    super(type, schemaType, writerFunction, readerFunction);
+    this.isBatchReader = isBatchReader;
+  }
+
+  @Override
+  public FileFormat format() {
+    return FileFormat.PARQUET;
+  }
+
+  @Override
+  public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
+    return new WriteBuilderWrapper<>(outputFile, writerFunction());
+  }
+
+  @Override
+  public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
+    return new ReadBuilderWrapper<>(inputFile, readerFunction(), 
isBatchReader);
+  }
+
+  private static class WriteBuilderWrapper<D, S> implements 
ModelWriteBuilder<D, S> {
+    private final Parquet.WriteBuilder internal;
+    private final WriterFunction<ParquetValueWriter<?>, S, MessageType> 
writerFunction;
+    private Schema schema;
+    private S engineSchema;
+    private FileContent content;
+
+    private WriteBuilderWrapper(
+        EncryptedOutputFile outputFile,
+        WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction) {
+      this.internal = Parquet.write(outputFile);
+      this.writerFunction = writerFunction;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> schema(Schema newSchema) {
+      this.schema = newSchema;
+      internal.schema(newSchema);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
+      this.engineSchema = newSchema;
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> set(String property, String value) {
+      if (WRITER_VERSION_KEY.equals(property)) {
+        internal.writerVersion(ParquetProperties.WriterVersion.valueOf(value));
+      }
+
+      internal.set(property, value);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> setAll(Map<String, String> properties) {
+      internal.setAll(properties);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> meta(String property, String value) {
+      internal.meta(property, value);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> meta(Map<String, String> properties) {
+      internal.meta(properties);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> content(FileContent newContent) {
+      this.content = newContent;
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> metricsConfig(MetricsConfig metricsConfig) {
+      internal.metricsConfig(metricsConfig);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> overwrite() {
+      internal.overwrite();
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> withFileEncryptionKey(ByteBuffer 
encryptionKey) {
+      internal.withFileEncryptionKey(encryptionKey);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix) {
+      internal.withAADPrefix(aadPrefix);
+      return this;
+    }
+
+    @Override
+    public FileAppender<D> build() throws IOException {
+      switch (content) {
+        case DATA:
+          
internal.createContextFunc(Parquet.WriteBuilder.Context::dataContext);
+          internal.createWriterFunc(
+              (icebergSchema, messageType) ->
+                  writerFunction.write(icebergSchema, messageType, 
engineSchema));
+          break;
+        case EQUALITY_DELETES:
+          
internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext);
+          internal.createWriterFunc(
+              (icebergSchema, messageType) ->
+                  writerFunction.write(icebergSchema, messageType, 
engineSchema));
+          break;
+        case POSITION_DELETES:
+          Preconditions.checkState(
+              schema == null,
+              "Invalid schema: %s. Position deletes with schema are not 
supported by the API.",
+              schema);
+          Preconditions.checkState(
+              engineSchema == null,
+              "Invalid engineSchema: %s. Position deletes with schema are not 
supported by the API.",
+              engineSchema);
+
+          
internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext);
+          internal.createWriterFunc(
+              (icebergSchema, messageType) ->
+                  new ParquetValueWriters.PositionDeleteStructWriter<D>(
+                      (ParquetValueWriters.StructWriter<?>)
+                          GenericParquetWriter.create(icebergSchema, 
messageType),
+                      Function.identity()));
+          internal.schema(DeleteSchemaUtil.pathPosSchema());
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown file content: " + 
content);
+      }
+
+      return internal.build();
+    }
+  }
+
+  private static class ReadBuilderWrapper<R, D, S> implements ReadBuilder<D, 
S> {
+    private final Parquet.ReadBuilder internal;
+    private final ReaderFunction<R, S, MessageType> readerFunction;
+    private final boolean isBatchReader;
+    private S engineSchema;
+    private Map<Integer, ?> idToConstant = ImmutableMap.of();
+
+    private ReadBuilderWrapper(
+        InputFile inputFile,
+        ReaderFunction<R, S, MessageType> readerFunction,
+        boolean isBatchReader) {
+      this.internal = Parquet.read(inputFile);
+      this.readerFunction = readerFunction;
+      this.isBatchReader = isBatchReader;
+    }
+
+    @Override
+    public ReadBuilder<D, S> split(long newStart, long newLength) {
+      internal.split(newStart, newLength);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> project(Schema schema) {
+      internal.project(schema);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> engineProjection(S schema) {
+      this.engineSchema = schema;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {
+      internal.caseSensitive(caseSensitive);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> filter(Expression filter) {
+      internal.filter(filter);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> set(String key, String value) {
+      internal.set(key, value);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> reuseContainers() {
+      internal.reuseContainers();
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
+      internal.recordsPerBatch(numRowsPerBatch);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
+      this.idToConstant = newIdToConstant;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> withNameMapping(NameMapping nameMapping) {
+      internal.withNameMapping(nameMapping);
+      return this;
+    }
+
+    @Override
+    public CloseableIterable<D> build() {
+      if (isBatchReader) {
+        return internal
+            .createBatchedReaderFunc(
+                (icebergSchema, messageType) ->
+                    (VectorizedReader<?>)
+                        readerFunction.read(icebergSchema, messageType, 
engineSchema, idToConstant))
+            .build();
+      } else {
+        return internal
+            .createReaderFunc(
+                (icebergSchema, messageType) ->
+                    (ParquetValueReader<?>)
+                        readerFunction.read(icebergSchema, messageType, 
engineSchema, idToConstant))
+            .build();
+      }
+    }
+  }
+}

Reply via email to