This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 62c988f64 chore: [iceberg] test iceberg 1.10.0 (#2709)
62c988f64 is described below
commit 62c988f64faba70648d6c6e120f4daea96bab9c7
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Nov 12 04:57:46 2025 +0800
chore: [iceberg] test iceberg 1.10.0 (#2709)
---
.github/workflows/iceberg_spark_test.yml | 6 +-
dev/diffs/iceberg/1.10.0.diff | 1770 ++++++++++++++++++++++++++++++
2 files changed, 1773 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/iceberg_spark_test.yml
b/.github/workflows/iceberg_spark_test.yml
index 36c94fa6e..7eb0a6c2b 100644
--- a/.github/workflows/iceberg_spark_test.yml
+++ b/.github/workflows/iceberg_spark_test.yml
@@ -46,7 +46,7 @@ jobs:
matrix:
os: [ubuntu-24.04]
java-version: [11, 17]
- iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}]
+ iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
spark-version: [{short: '3.5', full: '3.5.7'}]
scala-version: ['2.13']
fail-fast: false
@@ -85,7 +85,7 @@ jobs:
matrix:
os: [ubuntu-24.04]
java-version: [11, 17]
- iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}]
+ iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
spark-version: [{short: '3.5', full: '3.5.7'}]
scala-version: ['2.13']
fail-fast: false
@@ -124,7 +124,7 @@ jobs:
matrix:
os: [ubuntu-24.04]
java-version: [11, 17]
- iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}]
+ iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full:
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
spark-version: [{short: '3.5', full: '3.5.7'}]
scala-version: ['2.13']
fail-fast: false
diff --git a/dev/diffs/iceberg/1.10.0.diff b/dev/diffs/iceberg/1.10.0.diff
new file mode 100644
index 000000000..551fc59fa
--- /dev/null
+++ b/dev/diffs/iceberg/1.10.0.diff
@@ -0,0 +1,1770 @@
+diff --git a/build.gradle b/build.gradle
+index 6bc052885fc..db2aca3a5ee 100644
+--- a/build.gradle
++++ b/build.gradle
+@@ -878,6 +878,13 @@ project(':iceberg-parquet') {
+ implementation project(':iceberg-core')
+ implementation project(':iceberg-common')
+
++
implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}")
{
++ exclude group: 'org.apache.arrow'
++ exclude group: 'org.apache.parquet'
++ exclude group: 'org.apache.spark'
++ exclude group: 'org.apache.iceberg'
++ }
++
+ implementation(libs.parquet.avro) {
+ exclude group: 'org.apache.avro', module: 'avro'
+ // already shaded by Parquet
+diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
+index eeabe54f5f0..867018058ee 100644
+--- a/gradle/libs.versions.toml
++++ b/gradle/libs.versions.toml
+@@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0"
+ bson-ver = "4.11.5"
+ caffeine = "2.9.3"
+ calcite = "1.40.0"
+-comet = "0.8.1"
++comet = "0.12.0-SNAPSHOT"
+ datasketches = "6.2.0"
+ delta-standalone = "3.3.2"
+ delta-spark = "3.3.2"
+diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+new file mode 100644
+index 00000000000..ddf6c7de5ae
+--- /dev/null
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+@@ -0,0 +1,255 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.util.Map;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.iceberg.relocated.com.google.common.collect.Maps;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.schema.LogicalTypeAnnotation;
++import org.apache.parquet.schema.PrimitiveType;
++import org.apache.parquet.schema.Type;
++import org.apache.parquet.schema.Types;
++
++public class CometTypeUtils {
++
++ private CometTypeUtils() {}
++
++ public static ParquetColumnSpec
descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
++
++ String[] path = descriptor.getPath();
++ PrimitiveType primitiveType = descriptor.getPrimitiveType();
++ String physicalType = primitiveType.getPrimitiveTypeName().name();
++
++ int typeLength =
++ primitiveType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
++ ? primitiveType.getTypeLength()
++ : 0;
++
++ boolean isRepeated = primitiveType.getRepetition() ==
Type.Repetition.REPEATED;
++
++ // ToDo: extract this into a Util method
++ String logicalTypeName = null;
++ Map<String, String> logicalTypeParams = Maps.newHashMap();
++ LogicalTypeAnnotation logicalType =
primitiveType.getLogicalTypeAnnotation();
++
++ if (logicalType != null) {
++ logicalTypeName = logicalType.getClass().getSimpleName();
++
++ // Handle specific logical types
++ if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
++ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("precision",
String.valueOf(decimal.getPrecision()));
++ logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
++ } else if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
++ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType;
++ logicalTypeParams.put("isAdjustedToUTC",
String.valueOf(timestamp.isAdjustedToUTC()));
++ logicalTypeParams.put("unit", timestamp.getUnit().name());
++ } else if (logicalType instanceof
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
++ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("isAdjustedToUTC",
String.valueOf(time.isAdjustedToUTC()));
++ logicalTypeParams.put("unit", time.getUnit().name());
++ } else if (logicalType instanceof
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
++ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
++ logicalTypeParams.put("bitWidth",
String.valueOf(intType.getBitWidth()));
++ }
++ }
++
++ return new ParquetColumnSpec(
++ 1, // ToDo: pass in the correct id
++ path,
++ physicalType,
++ typeLength,
++ isRepeated,
++ descriptor.getMaxDefinitionLevel(),
++ descriptor.getMaxRepetitionLevel(),
++ logicalTypeName,
++ logicalTypeParams);
++ }
++
++ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec
columnSpec) {
++ PrimitiveType.PrimitiveTypeName primType =
++ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
++
++ Type.Repetition repetition;
++ if (columnSpec.getMaxRepetitionLevel() > 0) {
++ repetition = Type.Repetition.REPEATED;
++ } else if (columnSpec.getMaxDefinitionLevel() > 0) {
++ repetition = Type.Repetition.OPTIONAL;
++ } else {
++ repetition = Type.Repetition.REQUIRED;
++ }
++
++ String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
++ // Reconstruct the logical type from parameters
++ LogicalTypeAnnotation logicalType = null;
++ if (columnSpec.getLogicalTypeName() != null) {
++ logicalType =
++ reconstructLogicalType(
++ columnSpec.getLogicalTypeName(),
columnSpec.getLogicalTypeParams());
++ }
++
++ PrimitiveType primitiveType;
++ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
++ primitiveType =
++ org.apache.parquet.schema.Types.primitive(primType, repetition)
++ .length(columnSpec.getTypeLength())
++ .as(logicalType)
++ .id(columnSpec.getFieldId())
++ .named(name);
++ } else {
++ primitiveType =
++ Types.primitive(primType, repetition)
++ .as(logicalType)
++ .id(columnSpec.getFieldId())
++ .named(name);
++ }
++
++ return new ColumnDescriptor(
++ columnSpec.getPath(),
++ primitiveType,
++ columnSpec.getMaxRepetitionLevel(),
++ columnSpec.getMaxDefinitionLevel());
++ }
++
++ private static LogicalTypeAnnotation reconstructLogicalType(
++ String logicalTypeName, java.util.Map<String, String> params) {
++
++ switch (logicalTypeName) {
++ // MAP
++ case "MapLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.mapType();
++
++ // LIST
++ case "ListLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.listType();
++
++ // STRING
++ case "StringLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.stringType();
++
++ // MAP_KEY_VALUE
++ case "MapKeyValueLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
++
++ // ENUM
++ case "EnumLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.enumType();
++
++ // DECIMAL
++ case "DecimalLogicalTypeAnnotation":
++ if (!params.containsKey("scale") || !params.containsKey("precision"))
{
++ throw new IllegalArgumentException(
++ "Missing required parameters for DecimalLogicalTypeAnnotation:
" + params);
++ }
++ int scale = Integer.parseInt(params.get("scale"));
++ int precision = Integer.parseInt(params.get("precision"));
++ return LogicalTypeAnnotation.decimalType(scale, precision);
++
++ // DATE
++ case "DateLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.dateType();
++
++ // TIME
++ case "TimeLogicalTypeAnnotation":
++ if (!params.containsKey("isAdjustedToUTC") ||
!params.containsKey("unit")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for TimeLogicalTypeAnnotation: " +
params);
++ }
++
++ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++ String timeUnitStr = params.get("unit");
++
++ LogicalTypeAnnotation.TimeUnit timeUnit;
++ switch (timeUnitStr) {
++ case "MILLIS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++ break;
++ case "MICROS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
++ break;
++ case "NANOS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
++ break;
++ default:
++ throw new IllegalArgumentException("Unknown time unit: " +
timeUnitStr);
++ }
++ return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
++
++ // TIMESTAMP
++ case "TimestampLogicalTypeAnnotation":
++ if (!params.containsKey("isAdjustedToUTC") ||
!params.containsKey("unit")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for
TimestampLogicalTypeAnnotation: " + params);
++ }
++ boolean isAdjustedToUTC =
Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++ String unitStr = params.get("unit");
++
++ LogicalTypeAnnotation.TimeUnit unit;
++ switch (unitStr) {
++ case "MILLIS":
++ unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++ break;
++ case "MICROS":
++ unit = LogicalTypeAnnotation.TimeUnit.MICROS;
++ break;
++ case "NANOS":
++ unit = LogicalTypeAnnotation.TimeUnit.NANOS;
++ break;
++ default:
++ throw new IllegalArgumentException("Unknown timestamp unit: " +
unitStr);
++ }
++ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
++
++ // INTEGER
++ case "IntLogicalTypeAnnotation":
++ if (!params.containsKey("isSigned") ||
!params.containsKey("bitWidth")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for IntLogicalTypeAnnotation: " +
params);
++ }
++ boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
++ int bitWidth = Integer.parseInt(params.get("bitWidth"));
++ return LogicalTypeAnnotation.intType(bitWidth, isSigned);
++
++ // JSON
++ case "JsonLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.jsonType();
++
++ // BSON
++ case "BsonLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.bsonType();
++
++ // UUID
++ case "UUIDLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.uuidType();
++
++ // INTERVAL
++ case "IntervalLogicalTypeAnnotation":
++ return
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
++
++ default:
++ throw new IllegalArgumentException("Unknown logical type: " +
logicalTypeName);
++ }
++ }
++}
+diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+new file mode 100644
+index 00000000000..a3cba401827
+--- /dev/null
++++
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+@@ -0,0 +1,260 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.io.IOException;
++import java.io.UncheckedIOException;
++import java.nio.ByteBuffer;
++import java.util.List;
++import java.util.Map;
++import java.util.NoSuchElementException;
++import java.util.function.Function;
++import org.apache.comet.parquet.FileReader;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.comet.parquet.ReadOptions;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.parquet.WrappedInputFile;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.iceberg.Schema;
++import org.apache.iceberg.exceptions.RuntimeIOException;
++import org.apache.iceberg.expressions.Expression;
++import org.apache.iceberg.expressions.Expressions;
++import org.apache.iceberg.io.CloseableGroup;
++import org.apache.iceberg.io.CloseableIterable;
++import org.apache.iceberg.io.CloseableIterator;
++import org.apache.iceberg.io.InputFile;
++import org.apache.iceberg.mapping.NameMapping;
++import org.apache.iceberg.relocated.com.google.common.collect.Lists;
++import org.apache.iceberg.util.ByteBuffers;
++import org.apache.parquet.ParquetReadOptions;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
++import org.apache.parquet.hadoop.metadata.ColumnPath;
++import org.apache.parquet.schema.MessageType;
++
++public class CometVectorizedParquetReader<T> extends CloseableGroup
++ implements CloseableIterable<T> {
++ private final InputFile input;
++ private final ParquetReadOptions options;
++ private final Schema expectedSchema;
++ private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
++ private final Expression filter;
++ private final boolean reuseContainers;
++ private final boolean caseSensitive;
++ private final int batchSize;
++ private final NameMapping nameMapping;
++ private final Map<String, String> properties;
++ private Long start = null;
++ private Long length = null;
++ private ByteBuffer fileEncryptionKey = null;
++ private ByteBuffer fileAADPrefix = null;
++
++ public CometVectorizedParquetReader(
++ InputFile input,
++ Schema expectedSchema,
++ ParquetReadOptions options,
++ Function<MessageType, VectorizedReader<?>> readerFunc,
++ NameMapping nameMapping,
++ Expression filter,
++ boolean reuseContainers,
++ boolean caseSensitive,
++ int maxRecordsPerBatch,
++ Map<String, String> properties,
++ Long start,
++ Long length,
++ ByteBuffer fileEncryptionKey,
++ ByteBuffer fileAADPrefix) {
++ this.input = input;
++ this.expectedSchema = expectedSchema;
++ this.options = options;
++ this.batchReaderFunc = readerFunc;
++ // replace alwaysTrue with null to avoid extra work evaluating a trivial
filter
++ this.filter = filter == Expressions.alwaysTrue() ? null : filter;
++ this.reuseContainers = reuseContainers;
++ this.caseSensitive = caseSensitive;
++ this.batchSize = maxRecordsPerBatch;
++ this.nameMapping = nameMapping;
++ this.properties = properties;
++ this.start = start;
++ this.length = length;
++ this.fileEncryptionKey = fileEncryptionKey;
++ this.fileAADPrefix = fileAADPrefix;
++ }
++
++ private ReadConf conf = null;
++
++ private ReadConf init() {
++ if (conf == null) {
++ ReadConf readConf =
++ new ReadConf(
++ input,
++ options,
++ expectedSchema,
++ filter,
++ null,
++ batchReaderFunc,
++ nameMapping,
++ reuseContainers,
++ caseSensitive,
++ batchSize);
++ this.conf = readConf.copy();
++ return readConf;
++ }
++ return conf;
++ }
++
++ @Override
++ public CloseableIterator<T> iterator() {
++ FileIterator<T> iter =
++ new FileIterator<>(init(), properties, start, length,
fileEncryptionKey, fileAADPrefix);
++ addCloseable(iter);
++ return iter;
++ }
++
++ private static class FileIterator<T> implements CloseableIterator<T> {
++ // private final ParquetFileReader reader;
++ private final boolean[] shouldSkip;
++ private final VectorizedReader<T> model;
++ private final long totalValues;
++ private final int batchSize;
++ private final List<Map<ColumnPath, ColumnChunkMetaData>>
columnChunkMetadata;
++ private final boolean reuseContainers;
++ private int nextRowGroup = 0;
++ private long nextRowGroupStart = 0;
++ private long valuesRead = 0;
++ private T last = null;
++ private final FileReader cometReader;
++ private ReadConf conf;
++
++ FileIterator(
++ ReadConf conf,
++ Map<String, String> properties,
++ Long start,
++ Long length,
++ ByteBuffer fileEncryptionKey,
++ ByteBuffer fileAADPrefix) {
++ this.shouldSkip = conf.shouldSkip();
++ this.totalValues = conf.totalValues();
++ this.reuseContainers = conf.reuseContainers();
++ this.model = conf.vectorizedModel();
++ this.batchSize = conf.batchSize();
++ this.model.setBatchSize(this.batchSize);
++ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
++ this.cometReader =
++ newCometReader(
++ conf.file(),
++ conf.projection(),
++ properties,
++ start,
++ length,
++ fileEncryptionKey,
++ fileAADPrefix);
++ this.conf = conf;
++ }
++
++ private FileReader newCometReader(
++ InputFile file,
++ MessageType projection,
++ Map<String, String> properties,
++ Long start,
++ Long length,
++ ByteBuffer fileEncryptionKey,
++ ByteBuffer fileAADPrefix) {
++ try {
++ ReadOptions cometOptions = ReadOptions.builder(new
Configuration()).build();
++
++ FileReader fileReader =
++ new FileReader(
++ new WrappedInputFile(file),
++ cometOptions,
++ properties,
++ start,
++ length,
++ ByteBuffers.toByteArray(fileEncryptionKey),
++ ByteBuffers.toByteArray(fileAADPrefix));
++
++ List<ColumnDescriptor> columnDescriptors = projection.getColumns();
++
++ List<ParquetColumnSpec> specs = Lists.newArrayList();
++
++ for (ColumnDescriptor descriptor : columnDescriptors) {
++ ParquetColumnSpec spec =
CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
++ specs.add(spec);
++ }
++
++ fileReader.setRequestedSchemaFromSpecs(specs);
++ return fileReader;
++ } catch (IOException e) {
++ throw new UncheckedIOException("Failed to open Parquet file: " +
file.location(), e);
++ }
++ }
++
++ @Override
++ public boolean hasNext() {
++ return valuesRead < totalValues;
++ }
++
++ @Override
++ public T next() {
++ if (!hasNext()) {
++ throw new NoSuchElementException();
++ }
++ if (valuesRead >= nextRowGroupStart) {
++ advance();
++ }
++
++ // batchSize is an integer, so casting to integer is safe
++ int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead,
batchSize);
++ if (reuseContainers) {
++ this.last = model.read(last, numValuesToRead);
++ } else {
++ this.last = model.read(null, numValuesToRead);
++ }
++ valuesRead += numValuesToRead;
++
++ return last;
++ }
++
++ private void advance() {
++ while (shouldSkip[nextRowGroup]) {
++ nextRowGroup += 1;
++ cometReader.skipNextRowGroup();
++ }
++ RowGroupReader pages;
++ try {
++ pages = cometReader.readNextRowGroup();
++ } catch (IOException e) {
++ throw new RuntimeIOException(e);
++ }
++
++ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
++ nextRowGroupStart += pages.getRowCount();
++ nextRowGroup += 1;
++ }
++
++ @Override
++ public void close() throws IOException {
++ model.close();
++ cometReader.close();
++ if (conf != null && conf.reader() != null) {
++ conf.reader().close();
++ }
++ }
++ }
++}
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+index 6f68fbe150f..b740543f3c9 100644
+--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+@@ -1161,6 +1161,7 @@ public class Parquet {
+ private NameMapping nameMapping = null;
+ private ByteBuffer fileEncryptionKey = null;
+ private ByteBuffer fileAADPrefix = null;
++ private boolean isComet;
+
+ private ReadBuilder(InputFile file) {
+ this.file = file;
+@@ -1205,6 +1206,11 @@ public class Parquet {
+ return this;
+ }
+
++ public ReadBuilder enableComet(boolean enableComet) {
++ this.isComet = enableComet;
++ return this;
++ }
++
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link
#createReaderFunc(Function)} instead
+ */
+@@ -1300,7 +1306,7 @@ public class Parquet {
+ }
+
+ @Override
+- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
++ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity",
"MethodLength"})
+ public <D> CloseableIterable<D> build() {
+ FileDecryptionProperties fileDecryptionProperties = null;
+ if (fileEncryptionKey != null) {
+@@ -1352,16 +1358,35 @@ public class Parquet {
+ }
+
+ if (batchedReaderFunc != null) {
+- return new VectorizedParquetReader<>(
+- file,
+- schema,
+- options,
+- batchedReaderFunc,
+- mapping,
+- filter,
+- reuseContainers,
+- caseSensitive,
+- maxRecordsPerBatch);
++ if (isComet) {
++ LOG.info("Comet enabled");
++ return new CometVectorizedParquetReader<>(
++ file,
++ schema,
++ options,
++ batchedReaderFunc,
++ mapping,
++ filter,
++ reuseContainers,
++ caseSensitive,
++ maxRecordsPerBatch,
++ properties,
++ start,
++ length,
++ fileEncryptionKey,
++ fileAADPrefix);
++ } else {
++ return new VectorizedParquetReader<>(
++ file,
++ schema,
++ options,
++ batchedReaderFunc,
++ mapping,
++ filter,
++ reuseContainers,
++ caseSensitive,
++ maxRecordsPerBatch);
++ }
+ } else {
+ Function<MessageType, ParquetValueReader<?>> readBuilder =
+ readerFuncWithSchema != null
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+index 1fb2372ba56..142e5fbadf1 100644
+--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+@@ -157,6 +157,14 @@ class ReadConf<T> {
+ return newReader;
+ }
+
++ InputFile file() {
++ return file;
++ }
++
++ MessageType projection() {
++ return projection;
++ }
++
+ ParquetValueReader<T> model() {
+ return model;
+ }
+diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
+index 69700d84366..49ea338a458 100644
+--- a/spark/v3.5/build.gradle
++++ b/spark/v3.5/build.gradle
+@@ -264,6 +264,7 @@
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
+ integrationImplementation project(path: ':iceberg-hive-metastore',
configuration: 'testArtifacts')
+ integrationImplementation project(path:
":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}",
configuration: 'testArtifacts')
+ integrationImplementation project(path:
":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}",
configuration: 'testArtifacts')
++ integrationImplementation
"org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
+
+ // runtime dependencies for running Hive Catalog based integration test
+ integrationRuntimeOnly project(':iceberg-hive-metastore')
+diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+index 4c1a5095916..964f196daad 100644
+---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
++++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+@@ -59,6 +59,16 @@ public abstract class ExtensionsTestBase extends
CatalogTestBase {
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
String.valueOf(RANDOM.nextBoolean()))
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+index ecf9e6f8a59..0f8cced69aa 100644
+---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
++++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+@@ -56,6 +56,16 @@ public class TestCallStatementParser {
+ .master("local[2]")
+ .config("spark.sql.extensions",
IcebergSparkSessionExtensions.class.getName())
+ .config("spark.extra.prop", "value")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestCallStatementParser.parser = spark.sessionState().sqlParser();
+ }
+diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+index 64edb1002e9..5bb449f1ac7 100644
+---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
++++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+@@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark {
+ .config("spark.sql.catalog.spark_catalog",
SparkSessionCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
catalogWarehouse())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local");
+ spark = builder.getOrCreate();
+ }
+diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+index 77b79384a6d..08f7de1c0de 100644
+---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
++++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+@@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark {
+ "spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
getCatalogWarehouse())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local[*]");
+ spark = builder.getOrCreate();
+ Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
+diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+index c6794e43c63..f7359197407 100644
+---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
++++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+@@ -239,6 +239,16 @@ public class DVReaderBenchmark {
+ .config("spark.sql.catalog.spark_catalog",
SparkSessionCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
newWarehouseDir())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local[*]")
+ .getOrCreate();
+ }
+diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+index ac74fb5a109..e011b8b2510 100644
+---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
++++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+@@ -223,6 +223,16 @@ public class DVWriterBenchmark {
+ .config("spark.sql.catalog.spark_catalog",
SparkSessionCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
newWarehouseDir())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local[*]")
+ .getOrCreate();
+ }
+diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+index 68c537e34a4..f66be2f3896 100644
+---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
++++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+@@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark {
+ }
+
+ protected void setupSpark(boolean enableDictionaryEncoding) {
+- SparkSession.Builder builder =
SparkSession.builder().config("spark.ui.enabled", false);
++ SparkSession.Builder builder =
++ SparkSession.builder()
++ .config("spark.ui.enabled", false)
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true");
+ if (!enableDictionaryEncoding) {
+ builder
+ .config("parquet.dictionary.page.size", "1")
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+index 81b7d83a707..eba1a2a0fb1 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+@@ -19,18 +19,22 @@
+ package org.apache.iceberg.spark.data.vectorized;
+
+ import java.io.IOException;
++import org.apache.comet.CometConf;
+ import org.apache.comet.CometSchemaImporter;
+ import org.apache.comet.parquet.AbstractColumnReader;
+ import org.apache.comet.parquet.ColumnReader;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.comet.parquet.RowGroupReader;
+ import org.apache.comet.parquet.TypeUtil;
+ import org.apache.comet.parquet.Utils;
+ import org.apache.comet.shaded.arrow.memory.RootAllocator;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.parquet.VectorizedReader;
+ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+ import org.apache.iceberg.spark.SparkSchemaUtil;
+ import org.apache.iceberg.types.Types;
+ import org.apache.parquet.column.ColumnDescriptor;
+-import org.apache.parquet.column.page.PageReader;
++import org.apache.spark.sql.internal.SQLConf;
+ import org.apache.spark.sql.types.DataType;
+ import org.apache.spark.sql.types.Metadata;
+ import org.apache.spark.sql.types.StructField;
+@@ -42,23 +46,28 @@ class CometColumnReader implements
VectorizedReader<ColumnVector> {
+
+ private final ColumnDescriptor descriptor;
+ private final DataType sparkType;
++ private final int fieldId;
+
+ // The delegated ColumnReader from Comet side
+ private AbstractColumnReader delegate;
+ private boolean initialized = false;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private CometSchemaImporter importer;
++ private ParquetColumnSpec spec;
+
+- CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
++ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int
fieldId) {
+ this.sparkType = sparkType;
+ this.descriptor = descriptor;
++ this.fieldId = fieldId;
+ }
+
+ CometColumnReader(Types.NestedField field) {
+ DataType dataType = SparkSchemaUtil.convert(field.type());
+ StructField structField = new StructField(field.name(), dataType, false,
Metadata.empty());
+ this.sparkType = dataType;
+- this.descriptor = TypeUtil.convertToParquet(structField);
++ this.descriptor =
++
CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField));
++ this.fieldId = field.fieldId();
+ }
+
+ public AbstractColumnReader delegate() {
+@@ -92,7 +101,26 @@ class CometColumnReader implements
VectorizedReader<ColumnVector> {
+ }
+
+ this.importer = new CometSchemaImporter(new RootAllocator());
+- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer,
batchSize, false, false);
++
++ spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
++
++ boolean useLegacyTime =
++ Boolean.parseBoolean(
++ SQLConf.get()
++ .getConfString(
++
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false"));
++ boolean useLazyMaterialization =
++ Boolean.parseBoolean(
++
SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(),
"false"));
++ this.delegate =
++ Utils.getColumnReader(
++ sparkType,
++ spec,
++ importer,
++ batchSize,
++ true, // Comet sets this to true for native execution
++ useLazyMaterialization,
++ useLegacyTime);
+ this.initialized = true;
+ }
+
+@@ -111,9 +139,9 @@ class CometColumnReader implements
VectorizedReader<ColumnVector> {
+ * <p>NOTE: this should be called before reading a new Parquet column
chunk, and after {@link
+ * CometColumnReader#reset} is called.
+ */
+- public void setPageReader(PageReader pageReader) throws IOException {
++ public void setPageReader(RowGroupReader pageStore) throws IOException {
+ Preconditions.checkState(initialized, "Invalid state: 'reset' should be
called first");
+- ((ColumnReader) delegate).setPageReader(pageReader);
++ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec);
+ }
+
+ @Override
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+index 04ac69476ad..916face2bf2 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+@@ -22,8 +22,12 @@ import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.util.List;
+ import java.util.Map;
++import org.apache.comet.CometRuntimeException;
+ import org.apache.comet.parquet.AbstractColumnReader;
+-import org.apache.comet.parquet.BatchReader;
++import org.apache.comet.parquet.IcebergCometBatchReader;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.vector.CometSelectionVector;
++import org.apache.comet.vector.CometVector;
+ import org.apache.iceberg.Schema;
+ import org.apache.iceberg.data.DeleteFilter;
+ import org.apache.iceberg.parquet.VectorizedReader;
+@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
+ // calling BatchReader.nextBatch, the isDeleted value is not yet available,
so
+ // DeleteColumnReader.readBatch must be called explicitly later, after the
isDeleted value is
+ // available.
+- private final BatchReader delegate;
++ private final IcebergCometBatchReader delegate;
+ private DeleteFilter<InternalRow> deletes = null;
+ private long rowStartPosInBatch = 0;
+
+@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
+ this.hasIsDeletedColumn =
+ readers.stream().anyMatch(reader -> reader instanceof
CometDeleteColumnReader);
+
+- AbstractColumnReader[] abstractColumnReaders = new
AbstractColumnReader[readers.size()];
+- this.delegate = new BatchReader(abstractColumnReaders);
+- delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
++ this.delegate = new IcebergCometBatchReader(readers.size(),
SparkSchemaUtil.convert(schema));
+ }
+
+ @Override
+@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
+ && !(readers[i] instanceof CometPositionColumnReader)
+ && !(readers[i] instanceof CometDeleteColumnReader)) {
+ readers[i].reset();
+-
readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
++ readers[i].setPageReader((RowGroupReader) pageStore);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to setRowGroupInfo for Comet
vectorization", e);
+ }
+ }
+
++ AbstractColumnReader[] delegateReaders = new
AbstractColumnReader[readers.length];
+ for (int i = 0; i < readers.length; i++) {
+- delegate.getColumnReaders()[i] = this.readers[i].delegate();
++ delegateReaders[i] = readers[i].delegate();
+ }
+
++ delegate.init(delegateReaders);
++
+ this.rowStartPosInBatch =
+- pageStore
++ ((RowGroupReader) pageStore)
+ .getRowIndexOffset()
+ .orElseThrow(
+ () ->
+@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
+ Pair<int[], Integer> pair = buildRowIdMapping(vectors);
+ if (pair != null) {
+ int[] rowIdMapping = pair.first();
+- numLiveRows = pair.second();
+- for (int i = 0; i < vectors.length; i++) {
+- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
++ if (pair.second() != null) {
++ numLiveRows = pair.second();
++ for (int i = 0; i < vectors.length; i++) {
++ if (vectors[i] instanceof CometVector) {
++ vectors[i] =
++ new CometSelectionVector((CometVector) vectors[i],
rowIdMapping, numLiveRows);
++ } else {
++ throw new CometRuntimeException(
++ "Unsupported column vector type: " +
vectors[i].getClass());
++ }
++ }
+ }
+ }
+ }
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+index 047c96314b1..88d691a607a 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
+ import java.math.BigDecimal;
+ import java.nio.ByteBuffer;
+ import org.apache.comet.parquet.ConstantColumnReader;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.spark.sql.types.DataType;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -34,7 +35,11 @@ class CometConstantColumnReader<T> extends
CometColumnReader {
+ super(field);
+ // use delegate to set constant value on the native side to be consumed
by native execution.
+ setDelegate(
+- new ConstantColumnReader(sparkType(), descriptor(),
convertToSparkValue(value), false));
++ new ConstantColumnReader(
++ sparkType(),
++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()),
++ convertToSparkValue(value),
++ false));
+ }
+
+ @Override
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+index 6235bfe4865..cba108e4326 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+@@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader
{
+ DeleteColumnReader() {
+ super(
+ DataTypes.BooleanType,
+- TypeUtil.convertToParquet(
++ TypeUtil.convertToParquetSpec(
+ new StructField("_deleted", DataTypes.BooleanType, false,
Metadata.empty())),
+ false /* useDecimal128 = false */,
+- false /* isConstant = false */);
++ false /* isConstant */);
+ this.isDeleted = new boolean[0];
+ }
+
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+index bcc0e514c28..98e80068c51 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
+
+ import org.apache.comet.parquet.MetadataColumnReader;
+ import org.apache.comet.parquet.Native;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.parquet.column.ColumnDescriptor;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader {
+ PositionColumnReader(ColumnDescriptor descriptor) {
+ super(
+ DataTypes.LongType,
+- descriptor,
++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor),
+ false /* useDecimal128 = false */,
+ false /* isConstant = false */);
+ }
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+index d36f1a72747..56f8c9bff93 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends
TypeWithSchemaVisitor<VectorizedReade
+ return null;
+ }
+
+- return new
CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc);
++ return new CometColumnReader(
++ SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId);
+ }
+ }
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+index a0f45e7610a..473b34bb0f3 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+@@ -111,6 +111,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
+ // read performance as every batch read doesn't have to pay the cost
of allocating memory.
+ .reuseContainers()
+ .withNameMapping(nameMapping())
++ .enableComet(parquetConf.readerType() == ParquetReaderType.COMET)
+ .build();
+ }
+
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+index 0626d0b4398..9ec8f534669 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+@@ -175,11 +175,11 @@ class SparkBatch implements Batch {
+ return field.type().isPrimitiveType() ||
MetadataColumns.isMetadataColumn(field.fieldId());
+ }
+
+- private boolean useCometBatchReads() {
++ protected boolean useCometBatchReads() {
+ return readConf.parquetVectorizationEnabled()
+ && readConf.parquetReaderType() == ParquetReaderType.COMET
+ &&
expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
+- && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
++ && taskGroups.stream().allMatch(this::supportsCometBatchReads);
+ }
+
+ private boolean supportsCometBatchReads(Types.NestedField field) {
+@@ -189,6 +189,21 @@ class SparkBatch implements Batch {
+ && field.fieldId() !=
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
+ }
+
++ private boolean supportsCometBatchReads(ScanTask task) {
++ if (task instanceof ScanTaskGroup) {
++ ScanTaskGroup<?> taskGroup = (ScanTaskGroup<?>) task;
++ return
taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads);
++
++ } else if (task.isFileScanTask() && !task.isDataTask()) {
++ FileScanTask fileScanTask = task.asFileScanTask();
++ // Comet can't handle delete files for now
++ return fileScanTask.file().format() == FileFormat.PARQUET;
++
++ } else {
++ return false;
++ }
++ }
++
+ // conditions for using ORC batch reads:
+ // - ORC vectorization is enabled
+ // - all tasks are of type FileScanTask and read only ORC files with no
delete files
+diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+index 106b296de09..967b0d41d08 100644
+---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
++++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+@@ -24,6 +24,7 @@ import java.util.Map;
+ import java.util.Optional;
+ import java.util.function.Supplier;
+ import java.util.stream.Collectors;
++import org.apache.comet.parquet.SupportsComet;
+ import org.apache.iceberg.BlobMetadata;
+ import org.apache.iceberg.ScanTask;
+ import org.apache.iceberg.ScanTaskGroup;
+@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+-abstract class SparkScan implements Scan, SupportsReportStatistics {
++abstract class SparkScan implements Scan, SupportsReportStatistics,
SupportsComet {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
+ private static final String NDV_KEY = "ndv";
+
+@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
+ return splitSize;
+ }
+ }
++
++ @Override
++ public boolean isCometEnabled() {
++ SparkBatch batch = (SparkBatch) this.toBatch();
++ return batch.useCometBatchReads();
++ }
+ }
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+index 404ba728460..26d6f9b613f 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
+ .master("local[2]")
+ .config("spark.serializer", serializer)
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+ }
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+index 659507e4c5e..eb9cedc34c5 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+index a218f965ea6..395c02441e7 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+index 2665d7ba8d3..306e859ce1a 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+index daf4e29ac07..fc9ee40c502 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+@@ -79,6 +79,17 @@ public abstract class TestBase extends SparkTestHelperBase {
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .config("spark.comet.exec.broadcastExchange.enabled", "false")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+index 973a17c9a38..dd0fd5cc9aa 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+@@ -65,6 +65,16 @@ public class TestParquetDictionaryEncodedVectorizedReads
extends TestParquetVect
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+index 1c5905744a7..6db62e1f90d 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+@@ -61,6 +61,16 @@ public abstract class ScanTestBase extends AvroDataTestBase
{
+ ScanTestBase.spark =
+ SparkSession.builder()
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local[2]")
+ .getOrCreate();
+ ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+index 19ec6d13dd5..bf7c837cf38 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+@@ -144,7 +144,20 @@ public class TestCompressionSettings extends
CatalogTestBase {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestCompressionSettings.spark =
SparkSession.builder().master("local[2]").getOrCreate();
++ TestCompressionSettings.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @BeforeEach
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+index a7702b169a6..bbb85f7d5c6 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+@@ -74,7 +74,20 @@ public class TestDataSourceOptions extends
TestBaseWithCatalog {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestDataSourceOptions.spark =
SparkSession.builder().master("local[2]").getOrCreate();
++ TestDataSourceOptions.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @AfterAll
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+index fd7d52178f2..929ebd405c5 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+@@ -114,6 +114,16 @@ public class TestFilteredScan {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+index 153564f7d12..761c20f5d80 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+@@ -98,6 +98,16 @@ public class TestForwardCompatibility {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+index f4f57157e47..d1a7cc64179 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+@@ -51,6 +51,16 @@ public class TestIcebergSpark {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+index e1402396fa7..ca4212f52e6 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+@@ -118,6 +118,16 @@ public class TestPartitionPruning {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestPartitionPruning.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+index 0b6ab2052b6..a8176332fb7 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+@@ -112,6 +112,16 @@ public class TestPartitionValues {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+index 11865db7fce..8fe32e8300c 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+@@ -91,6 +91,16 @@ public class TestSnapshotSelection {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+index 3051e27d720..6c39f76c286 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+@@ -125,6 +125,16 @@ public class TestSparkDataFile {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestSparkDataFile.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ }
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+index 4ccbf86f125..40cff1f69a7 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+@@ -100,6 +100,16 @@ public class TestSparkDataWrite {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+@@ -144,7 +154,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ for (ManifestFile manifest :
+ SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) {
+@@ -213,7 +223,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -258,7 +268,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -310,7 +320,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -352,7 +362,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -391,7 +401,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+
+ List<DataFile> files = Lists.newArrayList();
+@@ -459,7 +469,7 @@ public class TestSparkDataWrite {
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -706,7 +716,7 @@ public class TestSparkDataWrite {
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ List<SimpleRecord> actual =
+-
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id",
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual)
+ .hasSize(records.size() + records2.size())
+ .containsExactlyInAnyOrder(
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+index 596d05d30b5..dc8563314c7 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+@@ -88,6 +88,16 @@ public class TestSparkReadProjection extends
TestReadProjection {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ ImmutableMap<String, String> config =
+ ImmutableMap.of(
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+index 42699f46623..058c2d79b62 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+@@ -138,6 +138,16 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
+ .config("spark.ui.liveUpdate.period", 0)
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+index baf7fa8f88a..509c5deba51 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+index 54048bbf218..b1a2ca92098 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+@@ -69,6 +69,16 @@ public class TestStructuredStreaming {
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.sql.shuffle.partitions", 4)
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+index 8b1e3fbfc77..74936e2487e 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestTimestampWithoutZone.spark =
SparkSession.builder().master("local[2]").getOrCreate();
++ TestTimestampWithoutZone.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @AfterAll
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+index c3fac70dd3f..b7f2431c119 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+@@ -84,6 +84,16 @@ public class TestWriteMetricsConfig {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host",
InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestWriteMetricsConfig.sc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ }
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+index 5ce56b4feca..0def2a156d4 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+@@ -63,6 +63,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.iceberg.aggregate_pushdown", "true")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+index 9d2ce2b388a..5e233688488 100644
+---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
++++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+@@ -598,9 +598,7 @@ public class TestFilterPushDown extends
TestBaseWithCatalog {
+ String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
+
+ if (sparkFilter != null) {
+- assertThat(planAsString)
+- .as("Post scan filter should match")
+- .contains("Filter (" + sparkFilter + ")");
++ assertThat(planAsString).as("Post scan filter should
match").contains("CometFilter");
+ } else {
+ assertThat(planAsString).as("Should be no post scan
filter").doesNotContain("Filter (");
+ }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]