This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 83fe4ffc58 Orc, Data: Implementation of ORCFormatModel (#15255)
83fe4ffc58 is described below
commit 83fe4ffc580cde897eab7c7471d2dc7860e204bf
Author: pvary <[email protected]>
AuthorDate: Sun Feb 15 15:26:51 2026 +0100
Orc, Data: Implementation of ORCFormatModel (#15255)
---
.../apache/iceberg/data/GenericFormatModels.java | 14 +
.../iceberg/data/TestGenericFormatModels.java | 2 +-
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 21 +-
.../org/apache/iceberg/orc/ORCFormatModel.java | 311 +++++++++++++++++++++
4 files changed, 342 insertions(+), 6 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
index 6fde8bbeba..af7176f7a1 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.data;
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
import org.apache.iceberg.parquet.ParquetFormatModel;
public class GenericFormatModels {
@@ -48,6 +51,17 @@ public class GenericFormatModels {
GenericParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
+
+ FormatModelRegistry.register(
+ ORCFormatModel.create(
+ Record.class,
+ Void.class,
+ (icebergSchema, fileSchema, engineSchema) ->
+ GenericOrcWriter.buildWriter(icebergSchema, fileSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ GenericOrcReader.buildReader(icebergSchema, fileSchema,
idToConstant)));
+
+ FormatModelRegistry.register(ORCFormatModel.forPositionDeletes());
}
private GenericFormatModels() {}
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
index ab5968da8b..d2bf1cb3e8 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
@@ -55,7 +55,7 @@ public class TestGenericFormatModels {
RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
private static final FileFormat[] FILE_FORMATS =
- new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET};
+ new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
@TempDir protected Path temp;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 5559c0041b..2c8fd6e436 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -52,6 +53,7 @@ import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -79,7 +81,10 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.orc.CompressionKind;
@@ -179,9 +184,8 @@ public class ORC {
return this;
}
- // Supposed to always be a private method used strictly by data and delete
write builders
- private WriteBuilder createContextFunc(
- Function<Map<String, String>, Context> newCreateContextFunc) {
+ // supposed to always be a private method used strictly by data and delete
write builders
+ WriteBuilder createContextFunc(Function<Map<String, String>, Context>
newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
@@ -219,7 +223,7 @@ public class ORC {
metricsConfig);
}
- private static class Context {
+ static class Context {
private final long stripeSize;
private final long blockSize;
private final int vectorizedRowBatchSize;
@@ -699,6 +703,7 @@ public class ORC {
private Function<TypeDescription, OrcRowReader<?>> readerFunc;
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;
+ private Set<Integer> constantFieldIds = ImmutableSet.of();
private ReadBuilder(InputFile file) {
Preconditions.checkNotNull(file, "Input file cannot be null");
@@ -775,12 +780,18 @@ public class ORC {
return this;
}
+ ReadBuilder constantFieldIds(Set<Integer> newConstantFieldIds) {
+ this.constantFieldIds = newConstantFieldIds;
+ return this;
+ }
+
public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcIterable<>(
file,
conf,
- schema,
+ TypeUtil.selectNot(
+ schema, Sets.union(constantFieldIds,
MetadataColumns.metadataFieldIds())),
nameMapping,
start,
length,
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java
b/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java
new file mode 100644
index 0000000000..ed5d734ef9
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.BaseFormatModel;
+import org.apache.iceberg.formats.ModelWriteBuilder;
+import org.apache.iceberg.formats.ReadBuilder;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.orc.TypeDescription;
+
+public class ORCFormatModel<D, S, R>
+ extends BaseFormatModel<D, S, OrcRowWriter<?>, R, TypeDescription> {
+ private final boolean isBatchReader;
+
+ public static <D> ORCFormatModel<PositionDelete<D>, Void, Object>
forPositionDeletes() {
+ return new ORCFormatModel<>(PositionDelete.deleteClass(), Void.class,
null, null, false);
+ }
+
+ public static <D, S> ORCFormatModel<D, S, OrcRowReader<D>> create(
+ Class<D> type,
+ Class<S> schemaType,
+ WriterFunction<OrcRowWriter<?>, S, TypeDescription> writerFunction,
+ ReaderFunction<OrcRowReader<D>, S, TypeDescription> readerFunction) {
+ return new ORCFormatModel<>(type, schemaType, writerFunction,
readerFunction, false);
+ }
+
+ public static <D, S> ORCFormatModel<D, S, OrcBatchReader<?>> create(
+ Class<D> type,
+ Class<S> schemaType,
+ ReaderFunction<OrcBatchReader<?>, S, TypeDescription>
batchReaderFunction) {
+ return new ORCFormatModel<>(type, schemaType, null, batchReaderFunction,
true);
+ }
+
+ private ORCFormatModel(
+ Class<? extends D> type,
+ Class<S> schemaType,
+ WriterFunction<OrcRowWriter<?>, S, TypeDescription> writerFunction,
+ ReaderFunction<R, S, TypeDescription> readerFunction,
+ boolean isBatchReader) {
+ super(type, schemaType, writerFunction, readerFunction);
+ this.isBatchReader = isBatchReader;
+ }
+
+ @Override
+ public FileFormat format() {
+ return FileFormat.ORC;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
+ return new WriteBuilderWrapper<>(outputFile, writerFunction());
+ }
+
+ @Override
+ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
+ return new ReadBuilderWrapper<>(inputFile, readerFunction(),
isBatchReader);
+ }
+
+ private static class WriteBuilderWrapper<D, S> implements
ModelWriteBuilder<D, S> {
+ private final ORC.WriteBuilder internal;
+ private final WriterFunction<OrcRowWriter<?>, S, TypeDescription>
writerFunction;
+ private Schema schema;
+ private S engineSchema;
+
+ private FileContent content;
+
+ private WriteBuilderWrapper(
+ EncryptedOutputFile outputFile,
+ WriterFunction<OrcRowWriter<?>, S, TypeDescription> writerFunction) {
+ this.internal = ORC.write(outputFile);
+ this.writerFunction = writerFunction;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> schema(Schema newSchema) {
+ this.schema = newSchema;
+ internal.schema(newSchema);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
+ this.engineSchema = newSchema;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> set(String property, String value) {
+ internal.set(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> setAll(Map<String, String> properties) {
+ internal.setAll(properties);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> meta(String property, String value) {
+ internal.metadata(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> content(FileContent newContent) {
+ this.content = newContent;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> metricsConfig(MetricsConfig metricsConfig) {
+ internal.metricsConfig(metricsConfig);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> overwrite() {
+ internal.overwrite();
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withFileEncryptionKey(ByteBuffer
encryptionKey) {
+ // ORC doesn't support file encryption
+ throw new UnsupportedOperationException("ORC does not support file
encryption keys");
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix) {
+ // ORC doesn't support file encryption
+ throw new UnsupportedOperationException("ORC does not support AAD
prefix");
+ }
+
+ @Override
+ public FileAppender<D> build() {
+ switch (content) {
+ case DATA:
+ internal.createContextFunc(ORC.WriteBuilder.Context::dataContext);
+ internal.createWriterFunc(
+ (icebergSchema, typeDescription) ->
+ writerFunction.write(icebergSchema, typeDescription,
engineSchema));
+ break;
+ case EQUALITY_DELETES:
+ internal.createContextFunc(ORC.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(
+ (icebergSchema, typeDescription) ->
+ writerFunction.write(icebergSchema, typeDescription,
engineSchema));
+ break;
+ case POSITION_DELETES:
+ Preconditions.checkState(
+ schema == null,
+ "Invalid schema: %s. Position deletes with schema are not
supported by the API.",
+ schema);
+ Preconditions.checkState(
+ engineSchema == null,
+ "Invalid engineSchema: %s. Position deletes with schema are not
supported by the API.",
+ engineSchema);
+
+ internal.createContextFunc(ORC.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(
+ (icebergSchema, typeDescription) ->
+ GenericOrcWriters.positionDelete(
+ GenericOrcWriter.buildWriter(icebergSchema,
typeDescription),
+ Function.identity()));
+ internal.schema(DeleteSchemaUtil.pathPosSchema());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown file content: " +
content);
+ }
+
+ return internal.build();
+ }
+ }
+
+ private static class ReadBuilderWrapper<D, S, R> implements ReadBuilder<D,
S> {
+ private final ORC.ReadBuilder internal;
+ private final ReaderFunction<R, S, TypeDescription> readerFunction;
+ private final boolean isBatchReader;
+ private S engineSchema;
+ private boolean reuseContainers = false;
+ private Schema icebergSchema;
+ private Map<Integer, ?> idToConstant = ImmutableMap.of();
+
+ private ReadBuilderWrapper(
+ InputFile inputFile,
+ ReaderFunction<R, S, TypeDescription> readerFunction,
+ boolean isBatchReader) {
+ this.internal = ORC.read(inputFile);
+ this.readerFunction = readerFunction;
+ this.isBatchReader = isBatchReader;
+ }
+
+ @Override
+ public ReadBuilder<D, S> split(long newStart, long newLength) {
+ internal.split(newStart, newLength);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> project(Schema schema) {
+ this.icebergSchema = schema;
+ internal.project(schema);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> engineProjection(S schema) {
+ this.engineSchema = schema;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {
+ internal.caseSensitive(caseSensitive);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> filter(Expression filter) {
+ internal.filter(filter);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> set(String key, String value) {
+ internal.config(key, value);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> reuseContainers() {
+ this.reuseContainers = true;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
+ internal.recordsPerBatch(numRowsPerBatch);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
+ internal.constantFieldIds(newIdToConstant.keySet());
+ this.idToConstant = newIdToConstant;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> withNameMapping(NameMapping nameMapping) {
+ internal.withNameMapping(nameMapping);
+ return this;
+ }
+
+ @Override
+ public org.apache.iceberg.io.CloseableIterable<D> build() {
+ Preconditions.checkNotNull(reuseContainers, "Reuse containers is
required for ORC read");
+ if (isBatchReader) {
+ return internal
+ .createBatchedReaderFunc(
+ typeDescription ->
+ (OrcBatchReader<?>)
+ readerFunction.read(
+ icebergSchema, typeDescription, engineSchema,
idToConstant))
+ .build();
+ } else {
+ return internal
+ .createReaderFunc(
+ typeDescription ->
+ (OrcRowReader<?>)
+ readerFunction.read(
+ icebergSchema, typeDescription, engineSchema,
idToConstant))
+ .build();
+ }
+ }
+ }
+}