This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6b15becc97 Core, Arrow: Implementation of ArrowFormatModel (#15258)
6b15becc97 is described below
commit 6b15becc976a8353c48b9a8a6c75c7dc06cad0a3
Author: pvary <[email protected]>
AuthorDate: Mon Feb 16 20:22:26 2026 +0100
Core, Arrow: Implementation of ArrowFormatModel (#15258)
---
.../arrow/vectorized/ArrowFormatModels.java | 39 ++++++++++++++++++++++
.../iceberg/arrow/vectorized/ArrowReader.java | 33 ++++++++----------
.../iceberg/formats/FormatModelRegistry.java | 4 ++-
3 files changed, 56 insertions(+), 20 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java
new file mode 100644
index 0000000000..d70e12be78
--- /dev/null
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.arrow.vectorized;
+
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+
+public class ArrowFormatModels {
+ public static void register() {
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ ColumnarBatch.class,
+ Object.class,
+ (schema, fileSchema, engineSchema, idToConstant) ->
+ ArrowReader.VectorizedCombinedScanIterator.buildReader(
+ schema,
+ fileSchema,
+ NullCheckingForGet.NULL_CHECKING_ENABLED /*
setArrowValidityVector */)));
+ }
+
+ private ArrowFormatModels() {}
+}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 06b7baec27..68a27bdfb8 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.iceberg.CombinedScanTask;
@@ -40,13 +39,14 @@ import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -189,8 +189,7 @@ public class ArrowReader extends CloseableGroup {
* Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
Only Parquet data file
* format is supported.
*/
- private static final class VectorizedCombinedScanIterator
- implements CloseableIterator<ColumnarBatch> {
+ static final class VectorizedCombinedScanIterator implements
CloseableIterator<ColumnarBatch> {
private final Iterator<FileScanTask> fileItr;
private final Map<String, InputFile> inputFiles;
@@ -324,19 +323,8 @@ public class ArrowReader extends CloseableGroup {
InputFile location = getInputFile(task);
Preconditions.checkNotNull(location, "Could not find InputFile
associated with FileScanTask");
if (task.file().format() == FileFormat.PARQUET) {
- Parquet.ReadBuilder builder =
- Parquet.read(location)
- .project(expectedSchema)
- .split(task.start(), task.length())
- .createBatchedReaderFunc(
- fileSchema ->
- buildReader(
- expectedSchema,
- fileSchema, /* setArrowValidityVector */
- NullCheckingForGet.NULL_CHECKING_ENABLED))
- .recordsPerBatch(batchSize)
- .filter(task.residual())
- .caseSensitive(caseSensitive);
+ ReadBuilder<ColumnarBatch, ?> builder =
+ FormatModelRegistry.readBuilder(FileFormat.PARQUET,
ColumnarBatch.class, location);
if (reuseContainers) {
builder.reuseContainers();
@@ -345,7 +333,14 @@ public class ArrowReader extends CloseableGroup {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
- iter = builder.build();
+ iter =
+ builder
+ .project(expectedSchema)
+ .split(task.start(), task.length())
+ .recordsPerBatch(batchSize)
+ .caseSensitive(caseSensitive)
+ .filter(task.residual())
+ .build();
} else {
throw new UnsupportedOperationException(
"Format: " + task.file().format() + " not supported for batched
reads");
@@ -376,7 +371,7 @@ public class ArrowReader extends CloseableGroup {
* @param fileSchema Schema of the data file.
* @param setArrowValidityVector Indicates whether to set the validity
vector in Arrow vectors.
*/
- private static ArrowBatchReader buildReader(
+ static ArrowBatchReader buildReader(
Schema expectedSchema, MessageType fileSchema, boolean
setArrowValidityVector) {
return (ArrowBatchReader)
TypeWithSchemaVisitor.visit(
diff --git
a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
index b9adafdbc2..e86dd9f97a 100644
--- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
+++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
@@ -55,7 +55,9 @@ public final class FormatModelRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(FormatModelRegistry.class);
// The list of classes which are used for registering the reader and writer
builders
private static final List<String> CLASSES_TO_REGISTER =
- ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
+ ImmutableList.of(
+ "org.apache.iceberg.data.GenericFormatModels",
+ "org.apache.iceberg.arrow.vectorized.ArrowFormatModels");
// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>>
MODELS =