This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 62c988f64 chore: [iceberg] test iceberg 1.10.0 (#2709)
62c988f64 is described below

commit 62c988f64faba70648d6c6e120f4daea96bab9c7
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Nov 12 04:57:46 2025 +0800

    chore: [iceberg] test iceberg 1.10.0 (#2709)
---
 .github/workflows/iceberg_spark_test.yml |    6 +-
 dev/diffs/iceberg/1.10.0.diff            | 1770 ++++++++++++++++++++++++++++++
 2 files changed, 1773 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/iceberg_spark_test.yml 
b/.github/workflows/iceberg_spark_test.yml
index 36c94fa6e..7eb0a6c2b 100644
--- a/.github/workflows/iceberg_spark_test.yml
+++ b/.github/workflows/iceberg_spark_test.yml
@@ -46,7 +46,7 @@ jobs:
       matrix:
         os: [ubuntu-24.04]
         java-version: [11, 17]
-        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}]
+        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
         spark-version: [{short: '3.5', full: '3.5.7'}]
         scala-version: ['2.13']
       fail-fast: false
@@ -85,7 +85,7 @@ jobs:
       matrix:
         os: [ubuntu-24.04]
         java-version: [11, 17]
-        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}]
+        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
         spark-version: [{short: '3.5', full: '3.5.7'}]
         scala-version: ['2.13']
       fail-fast: false
@@ -124,7 +124,7 @@ jobs:
       matrix:
         os: [ubuntu-24.04]
         java-version: [11, 17]
-        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}]
+        iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: 
'1.9.1'}, {short: '1.10', full: '1.10.0'}]
         spark-version: [{short: '3.5', full: '3.5.7'}]
         scala-version: ['2.13']
       fail-fast: false
diff --git a/dev/diffs/iceberg/1.10.0.diff b/dev/diffs/iceberg/1.10.0.diff
new file mode 100644
index 000000000..551fc59fa
--- /dev/null
+++ b/dev/diffs/iceberg/1.10.0.diff
@@ -0,0 +1,1770 @@
+diff --git a/build.gradle b/build.gradle
+index 6bc052885fc..db2aca3a5ee 100644
+--- a/build.gradle
++++ b/build.gradle
+@@ -878,6 +878,13 @@ project(':iceberg-parquet') {
+     implementation project(':iceberg-core')
+     implementation project(':iceberg-common')
+ 
++    
implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}")
 {
++      exclude group: 'org.apache.arrow'
++      exclude group: 'org.apache.parquet'
++      exclude group: 'org.apache.spark'
++      exclude group: 'org.apache.iceberg'
++    }
++
+     implementation(libs.parquet.avro) {
+       exclude group: 'org.apache.avro', module: 'avro'
+       // already shaded by Parquet
+diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
+index eeabe54f5f0..867018058ee 100644
+--- a/gradle/libs.versions.toml
++++ b/gradle/libs.versions.toml
+@@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0"
+ bson-ver = "4.11.5"
+ caffeine = "2.9.3"
+ calcite = "1.40.0"
+-comet = "0.8.1"
++comet = "0.12.0-SNAPSHOT"
+ datasketches = "6.2.0"
+ delta-standalone = "3.3.2"
+ delta-spark = "3.3.2"
+diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+new file mode 100644
+index 00000000000..ddf6c7de5ae
+--- /dev/null
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+@@ -0,0 +1,255 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.util.Map;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.iceberg.relocated.com.google.common.collect.Maps;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.schema.LogicalTypeAnnotation;
++import org.apache.parquet.schema.PrimitiveType;
++import org.apache.parquet.schema.Type;
++import org.apache.parquet.schema.Types;
++
++public class CometTypeUtils {
++
++  private CometTypeUtils() {}
++
++  public static ParquetColumnSpec 
descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
++
++    String[] path = descriptor.getPath();
++    PrimitiveType primitiveType = descriptor.getPrimitiveType();
++    String physicalType = primitiveType.getPrimitiveTypeName().name();
++
++    int typeLength =
++        primitiveType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
++            ? primitiveType.getTypeLength()
++            : 0;
++
++    boolean isRepeated = primitiveType.getRepetition() == 
Type.Repetition.REPEATED;
++
++    // ToDo: extract this into a Util method
++    String logicalTypeName = null;
++    Map<String, String> logicalTypeParams = Maps.newHashMap();
++    LogicalTypeAnnotation logicalType = 
primitiveType.getLogicalTypeAnnotation();
++
++    if (logicalType != null) {
++      logicalTypeName = logicalType.getClass().getSimpleName();
++
++      // Handle specific logical types
++      if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
++        LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
++            (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
++        logicalTypeParams.put("precision", 
String.valueOf(decimal.getPrecision()));
++        logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
++      } else if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
++        LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
++            (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) 
logicalType;
++        logicalTypeParams.put("isAdjustedToUTC", 
String.valueOf(timestamp.isAdjustedToUTC()));
++        logicalTypeParams.put("unit", timestamp.getUnit().name());
++      } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
++        LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
++            (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
++        logicalTypeParams.put("isAdjustedToUTC", 
String.valueOf(time.isAdjustedToUTC()));
++        logicalTypeParams.put("unit", time.getUnit().name());
++      } else if (logicalType instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
++        LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
++            (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
++        logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
++        logicalTypeParams.put("bitWidth", 
String.valueOf(intType.getBitWidth()));
++      }
++    }
++
++    return new ParquetColumnSpec(
++        1, // ToDo: pass in the correct id
++        path,
++        physicalType,
++        typeLength,
++        isRepeated,
++        descriptor.getMaxDefinitionLevel(),
++        descriptor.getMaxRepetitionLevel(),
++        logicalTypeName,
++        logicalTypeParams);
++  }
++
++  public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec 
columnSpec) {
++    PrimitiveType.PrimitiveTypeName primType =
++        PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
++
++    Type.Repetition repetition;
++    if (columnSpec.getMaxRepetitionLevel() > 0) {
++      repetition = Type.Repetition.REPEATED;
++    } else if (columnSpec.getMaxDefinitionLevel() > 0) {
++      repetition = Type.Repetition.OPTIONAL;
++    } else {
++      repetition = Type.Repetition.REQUIRED;
++    }
++
++    String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
++    // Reconstruct the logical type from parameters
++    LogicalTypeAnnotation logicalType = null;
++    if (columnSpec.getLogicalTypeName() != null) {
++      logicalType =
++          reconstructLogicalType(
++              columnSpec.getLogicalTypeName(), 
columnSpec.getLogicalTypeParams());
++    }
++
++    PrimitiveType primitiveType;
++    if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
++      primitiveType =
++          org.apache.parquet.schema.Types.primitive(primType, repetition)
++              .length(columnSpec.getTypeLength())
++              .as(logicalType)
++              .id(columnSpec.getFieldId())
++              .named(name);
++    } else {
++      primitiveType =
++          Types.primitive(primType, repetition)
++              .as(logicalType)
++              .id(columnSpec.getFieldId())
++              .named(name);
++    }
++
++    return new ColumnDescriptor(
++        columnSpec.getPath(),
++        primitiveType,
++        columnSpec.getMaxRepetitionLevel(),
++        columnSpec.getMaxDefinitionLevel());
++  }
++
++  private static LogicalTypeAnnotation reconstructLogicalType(
++      String logicalTypeName, java.util.Map<String, String> params) {
++
++    switch (logicalTypeName) {
++        // MAP
++      case "MapLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.mapType();
++
++        // LIST
++      case "ListLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.listType();
++
++        // STRING
++      case "StringLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.stringType();
++
++        // MAP_KEY_VALUE
++      case "MapKeyValueLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
++
++        // ENUM
++      case "EnumLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.enumType();
++
++        // DECIMAL
++      case "DecimalLogicalTypeAnnotation":
++        if (!params.containsKey("scale") || !params.containsKey("precision")) 
{
++          throw new IllegalArgumentException(
++              "Missing required parameters for DecimalLogicalTypeAnnotation: 
" + params);
++        }
++        int scale = Integer.parseInt(params.get("scale"));
++        int precision = Integer.parseInt(params.get("precision"));
++        return LogicalTypeAnnotation.decimalType(scale, precision);
++
++        // DATE
++      case "DateLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.dateType();
++
++        // TIME
++      case "TimeLogicalTypeAnnotation":
++        if (!params.containsKey("isAdjustedToUTC") || 
!params.containsKey("unit")) {
++          throw new IllegalArgumentException(
++              "Missing required parameters for TimeLogicalTypeAnnotation: " + 
params);
++        }
++
++        boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++        String timeUnitStr = params.get("unit");
++
++        LogicalTypeAnnotation.TimeUnit timeUnit;
++        switch (timeUnitStr) {
++          case "MILLIS":
++            timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++            break;
++          case "MICROS":
++            timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
++            break;
++          case "NANOS":
++            timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
++            break;
++          default:
++            throw new IllegalArgumentException("Unknown time unit: " + 
timeUnitStr);
++        }
++        return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
++
++        // TIMESTAMP
++      case "TimestampLogicalTypeAnnotation":
++        if (!params.containsKey("isAdjustedToUTC") || 
!params.containsKey("unit")) {
++          throw new IllegalArgumentException(
++              "Missing required parameters for 
TimestampLogicalTypeAnnotation: " + params);
++        }
++        boolean isAdjustedToUTC = 
Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++        String unitStr = params.get("unit");
++
++        LogicalTypeAnnotation.TimeUnit unit;
++        switch (unitStr) {
++          case "MILLIS":
++            unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++            break;
++          case "MICROS":
++            unit = LogicalTypeAnnotation.TimeUnit.MICROS;
++            break;
++          case "NANOS":
++            unit = LogicalTypeAnnotation.TimeUnit.NANOS;
++            break;
++          default:
++            throw new IllegalArgumentException("Unknown timestamp unit: " + 
unitStr);
++        }
++        return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
++
++        // INTEGER
++      case "IntLogicalTypeAnnotation":
++        if (!params.containsKey("isSigned") || 
!params.containsKey("bitWidth")) {
++          throw new IllegalArgumentException(
++              "Missing required parameters for IntLogicalTypeAnnotation: " + 
params);
++        }
++        boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
++        int bitWidth = Integer.parseInt(params.get("bitWidth"));
++        return LogicalTypeAnnotation.intType(bitWidth, isSigned);
++
++        // JSON
++      case "JsonLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.jsonType();
++
++        // BSON
++      case "BsonLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.bsonType();
++
++        // UUID
++      case "UUIDLogicalTypeAnnotation":
++        return LogicalTypeAnnotation.uuidType();
++
++        // INTERVAL
++      case "IntervalLogicalTypeAnnotation":
++        return 
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
++
++      default:
++        throw new IllegalArgumentException("Unknown logical type: " + 
logicalTypeName);
++    }
++  }
++}
+diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+new file mode 100644
+index 00000000000..a3cba401827
+--- /dev/null
++++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+@@ -0,0 +1,260 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.io.IOException;
++import java.io.UncheckedIOException;
++import java.nio.ByteBuffer;
++import java.util.List;
++import java.util.Map;
++import java.util.NoSuchElementException;
++import java.util.function.Function;
++import org.apache.comet.parquet.FileReader;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.comet.parquet.ReadOptions;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.parquet.WrappedInputFile;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.iceberg.Schema;
++import org.apache.iceberg.exceptions.RuntimeIOException;
++import org.apache.iceberg.expressions.Expression;
++import org.apache.iceberg.expressions.Expressions;
++import org.apache.iceberg.io.CloseableGroup;
++import org.apache.iceberg.io.CloseableIterable;
++import org.apache.iceberg.io.CloseableIterator;
++import org.apache.iceberg.io.InputFile;
++import org.apache.iceberg.mapping.NameMapping;
++import org.apache.iceberg.relocated.com.google.common.collect.Lists;
++import org.apache.iceberg.util.ByteBuffers;
++import org.apache.parquet.ParquetReadOptions;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
++import org.apache.parquet.hadoop.metadata.ColumnPath;
++import org.apache.parquet.schema.MessageType;
++
++public class CometVectorizedParquetReader<T> extends CloseableGroup
++    implements CloseableIterable<T> {
++  private final InputFile input;
++  private final ParquetReadOptions options;
++  private final Schema expectedSchema;
++  private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
++  private final Expression filter;
++  private final boolean reuseContainers;
++  private final boolean caseSensitive;
++  private final int batchSize;
++  private final NameMapping nameMapping;
++  private final Map<String, String> properties;
++  private Long start = null;
++  private Long length = null;
++  private ByteBuffer fileEncryptionKey = null;
++  private ByteBuffer fileAADPrefix = null;
++
++  public CometVectorizedParquetReader(
++      InputFile input,
++      Schema expectedSchema,
++      ParquetReadOptions options,
++      Function<MessageType, VectorizedReader<?>> readerFunc,
++      NameMapping nameMapping,
++      Expression filter,
++      boolean reuseContainers,
++      boolean caseSensitive,
++      int maxRecordsPerBatch,
++      Map<String, String> properties,
++      Long start,
++      Long length,
++      ByteBuffer fileEncryptionKey,
++      ByteBuffer fileAADPrefix) {
++    this.input = input;
++    this.expectedSchema = expectedSchema;
++    this.options = options;
++    this.batchReaderFunc = readerFunc;
++    // replace alwaysTrue with null to avoid extra work evaluating a trivial 
filter
++    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
++    this.reuseContainers = reuseContainers;
++    this.caseSensitive = caseSensitive;
++    this.batchSize = maxRecordsPerBatch;
++    this.nameMapping = nameMapping;
++    this.properties = properties;
++    this.start = start;
++    this.length = length;
++    this.fileEncryptionKey = fileEncryptionKey;
++    this.fileAADPrefix = fileAADPrefix;
++  }
++
++  private ReadConf conf = null;
++
++  private ReadConf init() {
++    if (conf == null) {
++      ReadConf readConf =
++          new ReadConf(
++              input,
++              options,
++              expectedSchema,
++              filter,
++              null,
++              batchReaderFunc,
++              nameMapping,
++              reuseContainers,
++              caseSensitive,
++              batchSize);
++      this.conf = readConf.copy();
++      return readConf;
++    }
++    return conf;
++  }
++
++  @Override
++  public CloseableIterator<T> iterator() {
++    FileIterator<T> iter =
++        new FileIterator<>(init(), properties, start, length, 
fileEncryptionKey, fileAADPrefix);
++    addCloseable(iter);
++    return iter;
++  }
++
++  private static class FileIterator<T> implements CloseableIterator<T> {
++    // private final ParquetFileReader reader;
++    private final boolean[] shouldSkip;
++    private final VectorizedReader<T> model;
++    private final long totalValues;
++    private final int batchSize;
++    private final List<Map<ColumnPath, ColumnChunkMetaData>> 
columnChunkMetadata;
++    private final boolean reuseContainers;
++    private int nextRowGroup = 0;
++    private long nextRowGroupStart = 0;
++    private long valuesRead = 0;
++    private T last = null;
++    private final FileReader cometReader;
++    private ReadConf conf;
++
++    FileIterator(
++        ReadConf conf,
++        Map<String, String> properties,
++        Long start,
++        Long length,
++        ByteBuffer fileEncryptionKey,
++        ByteBuffer fileAADPrefix) {
++      this.shouldSkip = conf.shouldSkip();
++      this.totalValues = conf.totalValues();
++      this.reuseContainers = conf.reuseContainers();
++      this.model = conf.vectorizedModel();
++      this.batchSize = conf.batchSize();
++      this.model.setBatchSize(this.batchSize);
++      this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
++      this.cometReader =
++          newCometReader(
++              conf.file(),
++              conf.projection(),
++              properties,
++              start,
++              length,
++              fileEncryptionKey,
++              fileAADPrefix);
++      this.conf = conf;
++    }
++
++    private FileReader newCometReader(
++        InputFile file,
++        MessageType projection,
++        Map<String, String> properties,
++        Long start,
++        Long length,
++        ByteBuffer fileEncryptionKey,
++        ByteBuffer fileAADPrefix) {
++      try {
++        ReadOptions cometOptions = ReadOptions.builder(new 
Configuration()).build();
++
++        FileReader fileReader =
++            new FileReader(
++                new WrappedInputFile(file),
++                cometOptions,
++                properties,
++                start,
++                length,
++                ByteBuffers.toByteArray(fileEncryptionKey),
++                ByteBuffers.toByteArray(fileAADPrefix));
++
++        List<ColumnDescriptor> columnDescriptors = projection.getColumns();
++
++        List<ParquetColumnSpec> specs = Lists.newArrayList();
++
++        for (ColumnDescriptor descriptor : columnDescriptors) {
++          ParquetColumnSpec spec = 
CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
++          specs.add(spec);
++        }
++
++        fileReader.setRequestedSchemaFromSpecs(specs);
++        return fileReader;
++      } catch (IOException e) {
++        throw new UncheckedIOException("Failed to open Parquet file: " + 
file.location(), e);
++      }
++    }
++
++    @Override
++    public boolean hasNext() {
++      return valuesRead < totalValues;
++    }
++
++    @Override
++    public T next() {
++      if (!hasNext()) {
++        throw new NoSuchElementException();
++      }
++      if (valuesRead >= nextRowGroupStart) {
++        advance();
++      }
++
++      // batchSize is an integer, so casting to integer is safe
++      int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, 
batchSize);
++      if (reuseContainers) {
++        this.last = model.read(last, numValuesToRead);
++      } else {
++        this.last = model.read(null, numValuesToRead);
++      }
++      valuesRead += numValuesToRead;
++
++      return last;
++    }
++
++    private void advance() {
++      while (shouldSkip[nextRowGroup]) {
++        nextRowGroup += 1;
++        cometReader.skipNextRowGroup();
++      }
++      RowGroupReader pages;
++      try {
++        pages = cometReader.readNextRowGroup();
++      } catch (IOException e) {
++        throw new RuntimeIOException(e);
++      }
++
++      model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
++      nextRowGroupStart += pages.getRowCount();
++      nextRowGroup += 1;
++    }
++
++    @Override
++    public void close() throws IOException {
++      model.close();
++      cometReader.close();
++      if (conf != null && conf.reader() != null) {
++        conf.reader().close();
++      }
++    }
++  }
++}
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+index 6f68fbe150f..b740543f3c9 100644
+--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+@@ -1161,6 +1161,7 @@ public class Parquet {
+     private NameMapping nameMapping = null;
+     private ByteBuffer fileEncryptionKey = null;
+     private ByteBuffer fileAADPrefix = null;
++    private boolean isComet;
+ 
+     private ReadBuilder(InputFile file) {
+       this.file = file;
+@@ -1205,6 +1206,11 @@ public class Parquet {
+       return this;
+     }
+ 
++    public ReadBuilder enableComet(boolean enableComet) {
++      this.isComet = enableComet;
++      return this;
++    }
++
+     /**
+      * @deprecated will be removed in 2.0.0; use {@link 
#createReaderFunc(Function)} instead
+      */
+@@ -1300,7 +1306,7 @@ public class Parquet {
+     }
+ 
+     @Override
+-    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
++    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", 
"MethodLength"})
+     public <D> CloseableIterable<D> build() {
+       FileDecryptionProperties fileDecryptionProperties = null;
+       if (fileEncryptionKey != null) {
+@@ -1352,16 +1358,35 @@ public class Parquet {
+         }
+ 
+         if (batchedReaderFunc != null) {
+-          return new VectorizedParquetReader<>(
+-              file,
+-              schema,
+-              options,
+-              batchedReaderFunc,
+-              mapping,
+-              filter,
+-              reuseContainers,
+-              caseSensitive,
+-              maxRecordsPerBatch);
++          if (isComet) {
++            LOG.info("Comet enabled");
++            return new CometVectorizedParquetReader<>(
++                file,
++                schema,
++                options,
++                batchedReaderFunc,
++                mapping,
++                filter,
++                reuseContainers,
++                caseSensitive,
++                maxRecordsPerBatch,
++                properties,
++                start,
++                length,
++                fileEncryptionKey,
++                fileAADPrefix);
++          } else {
++            return new VectorizedParquetReader<>(
++                file,
++                schema,
++                options,
++                batchedReaderFunc,
++                mapping,
++                filter,
++                reuseContainers,
++                caseSensitive,
++                maxRecordsPerBatch);
++          }
+         } else {
+           Function<MessageType, ParquetValueReader<?>> readBuilder =
+               readerFuncWithSchema != null
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+index 1fb2372ba56..142e5fbadf1 100644
+--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+@@ -157,6 +157,14 @@ class ReadConf<T> {
+     return newReader;
+   }
+ 
++  InputFile file() {
++    return file;
++  }
++
++  MessageType projection() {
++    return projection;
++  }
++
+   ParquetValueReader<T> model() {
+     return model;
+   }
+diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
+index 69700d84366..49ea338a458 100644
+--- a/spark/v3.5/build.gradle
++++ b/spark/v3.5/build.gradle
+@@ -264,6 +264,7 @@ 
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
+     integrationImplementation project(path: ':iceberg-hive-metastore', 
configuration: 'testArtifacts')
+     integrationImplementation project(path: 
":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", 
configuration: 'testArtifacts')
+     integrationImplementation project(path: 
":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", 
configuration: 'testArtifacts')
++    integrationImplementation 
"org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
+ 
+     // runtime dependencies for running Hive Catalog based integration test
+     integrationRuntimeOnly project(':iceberg-hive-metastore')
+diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+index 4c1a5095916..964f196daad 100644
+--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
++++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+@@ -59,6 +59,16 @@ public abstract class ExtensionsTestBase extends 
CatalogTestBase {
+             
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+             .config(
+                 SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), 
String.valueOf(RANDOM.nextBoolean()))
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .enableHiveSupport()
+             .getOrCreate();
+ 
+diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+index ecf9e6f8a59..0f8cced69aa 100644
+--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
++++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+@@ -56,6 +56,16 @@ public class TestCallStatementParser {
+             .master("local[2]")
+             .config("spark.sql.extensions", 
IcebergSparkSessionExtensions.class.getName())
+             .config("spark.extra.prop", "value")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+     TestCallStatementParser.parser = spark.sessionState().sqlParser();
+   }
+diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+index 64edb1002e9..5bb449f1ac7 100644
+--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
++++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+@@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark {
+             .config("spark.sql.catalog.spark_catalog", 
SparkSessionCatalog.class.getName())
+             .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+             .config("spark.sql.catalog.spark_catalog.warehouse", 
catalogWarehouse())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .master("local");
+     spark = builder.getOrCreate();
+   }
+diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+index 77b79384a6d..08f7de1c0de 100644
+--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
++++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+@@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark {
+                 "spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog")
+             .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+             .config("spark.sql.catalog.spark_catalog.warehouse", 
getCatalogWarehouse())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .master("local[*]");
+     spark = builder.getOrCreate();
+     Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
+diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+index c6794e43c63..f7359197407 100644
+--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
++++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+@@ -239,6 +239,16 @@ public class DVReaderBenchmark {
+             .config("spark.sql.catalog.spark_catalog", 
SparkSessionCatalog.class.getName())
+             .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+             .config("spark.sql.catalog.spark_catalog.warehouse", 
newWarehouseDir())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .master("local[*]")
+             .getOrCreate();
+   }
+diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+index ac74fb5a109..e011b8b2510 100644
+--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
++++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+@@ -223,6 +223,16 @@ public class DVWriterBenchmark {
+             .config("spark.sql.catalog.spark_catalog", 
SparkSessionCatalog.class.getName())
+             .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+             .config("spark.sql.catalog.spark_catalog.warehouse", 
newWarehouseDir())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .master("local[*]")
+             .getOrCreate();
+   }
+diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+index 68c537e34a4..f66be2f3896 100644
+--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
++++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+@@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark {
+   }
+ 
+   protected void setupSpark(boolean enableDictionaryEncoding) {
+-    SparkSession.Builder builder = 
SparkSession.builder().config("spark.ui.enabled", false);
++    SparkSession.Builder builder =
++        SparkSession.builder()
++            .config("spark.ui.enabled", false)
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true");
+     if (!enableDictionaryEncoding) {
+       builder
+           .config("parquet.dictionary.page.size", "1")
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+index 81b7d83a707..eba1a2a0fb1 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+@@ -19,18 +19,22 @@
+ package org.apache.iceberg.spark.data.vectorized;
+ 
+ import java.io.IOException;
++import org.apache.comet.CometConf;
+ import org.apache.comet.CometSchemaImporter;
+ import org.apache.comet.parquet.AbstractColumnReader;
+ import org.apache.comet.parquet.ColumnReader;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.comet.parquet.RowGroupReader;
+ import org.apache.comet.parquet.TypeUtil;
+ import org.apache.comet.parquet.Utils;
+ import org.apache.comet.shaded.arrow.memory.RootAllocator;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.parquet.VectorizedReader;
+ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+ import org.apache.iceberg.spark.SparkSchemaUtil;
+ import org.apache.iceberg.types.Types;
+ import org.apache.parquet.column.ColumnDescriptor;
+-import org.apache.parquet.column.page.PageReader;
++import org.apache.spark.sql.internal.SQLConf;
+ import org.apache.spark.sql.types.DataType;
+ import org.apache.spark.sql.types.Metadata;
+ import org.apache.spark.sql.types.StructField;
+@@ -42,23 +46,28 @@ class CometColumnReader implements 
VectorizedReader<ColumnVector> {
+ 
+   private final ColumnDescriptor descriptor;
+   private final DataType sparkType;
++  private final int fieldId;
+ 
+   // The delegated ColumnReader from Comet side
+   private AbstractColumnReader delegate;
+   private boolean initialized = false;
+   private int batchSize = DEFAULT_BATCH_SIZE;
+   private CometSchemaImporter importer;
++  private ParquetColumnSpec spec;
+ 
+-  CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
++  CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int 
fieldId) {
+     this.sparkType = sparkType;
+     this.descriptor = descriptor;
++    this.fieldId = fieldId;
+   }
+ 
+   CometColumnReader(Types.NestedField field) {
+     DataType dataType = SparkSchemaUtil.convert(field.type());
+     StructField structField = new StructField(field.name(), dataType, false, 
Metadata.empty());
+     this.sparkType = dataType;
+-    this.descriptor = TypeUtil.convertToParquet(structField);
++    this.descriptor =
++        
CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField));
++    this.fieldId = field.fieldId();
+   }
+ 
+   public AbstractColumnReader delegate() {
+@@ -92,7 +101,26 @@ class CometColumnReader implements 
VectorizedReader<ColumnVector> {
+     }
+ 
+     this.importer = new CometSchemaImporter(new RootAllocator());
+-    this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, 
batchSize, false, false);
++
++    spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
++
++    boolean useLegacyTime =
++        Boolean.parseBoolean(
++            SQLConf.get()
++                .getConfString(
++                    
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false"));
++    boolean useLazyMaterialization =
++        Boolean.parseBoolean(
++            
SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), 
"false"));
++    this.delegate =
++        Utils.getColumnReader(
++            sparkType,
++            spec,
++            importer,
++            batchSize,
++            true, // Comet sets this to true for native execution
++            useLazyMaterialization,
++            useLegacyTime);
+     this.initialized = true;
+   }
+ 
+@@ -111,9 +139,9 @@ class CometColumnReader implements 
VectorizedReader<ColumnVector> {
+    * <p>NOTE: this should be called before reading a new Parquet column 
chunk, and after {@link
+    * CometColumnReader#reset} is called.
+    */
+-  public void setPageReader(PageReader pageReader) throws IOException {
++  public void setPageReader(RowGroupReader pageStore) throws IOException {
+     Preconditions.checkState(initialized, "Invalid state: 'reset' should be 
called first");
+-    ((ColumnReader) delegate).setPageReader(pageReader);
++    ((ColumnReader) delegate).setRowGroupReader(pageStore, spec);
+   }
+ 
+   @Override
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+index 04ac69476ad..916face2bf2 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+@@ -22,8 +22,12 @@ import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.util.List;
+ import java.util.Map;
++import org.apache.comet.CometRuntimeException;
+ import org.apache.comet.parquet.AbstractColumnReader;
+-import org.apache.comet.parquet.BatchReader;
++import org.apache.comet.parquet.IcebergCometBatchReader;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.vector.CometSelectionVector;
++import org.apache.comet.vector.CometVector;
+ import org.apache.iceberg.Schema;
+ import org.apache.iceberg.data.DeleteFilter;
+ import org.apache.iceberg.parquet.VectorizedReader;
+@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+   // calling BatchReader.nextBatch, the isDeleted value is not yet available, 
so
+   // DeleteColumnReader.readBatch must be called explicitly later, after the 
isDeleted value is
+   // available.
+-  private final BatchReader delegate;
++  private final IcebergCometBatchReader delegate;
+   private DeleteFilter<InternalRow> deletes = null;
+   private long rowStartPosInBatch = 0;
+ 
+@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+     this.hasIsDeletedColumn =
+         readers.stream().anyMatch(reader -> reader instanceof 
CometDeleteColumnReader);
+ 
+-    AbstractColumnReader[] abstractColumnReaders = new 
AbstractColumnReader[readers.size()];
+-    this.delegate = new BatchReader(abstractColumnReaders);
+-    delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
++    this.delegate = new IcebergCometBatchReader(readers.size(), 
SparkSchemaUtil.convert(schema));
+   }
+ 
+   @Override
+@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+             && !(readers[i] instanceof CometPositionColumnReader)
+             && !(readers[i] instanceof CometDeleteColumnReader)) {
+           readers[i].reset();
+-          
readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
++          readers[i].setPageReader((RowGroupReader) pageStore);
+         }
+       } catch (IOException e) {
+         throw new UncheckedIOException("Failed to setRowGroupInfo for Comet 
vectorization", e);
+       }
+     }
+ 
++    AbstractColumnReader[] delegateReaders = new 
AbstractColumnReader[readers.length];
+     for (int i = 0; i < readers.length; i++) {
+-      delegate.getColumnReaders()[i] = this.readers[i].delegate();
++      delegateReaders[i] = readers[i].delegate();
+     }
+ 
++    delegate.init(delegateReaders);
++
+     this.rowStartPosInBatch =
+-        pageStore
++        ((RowGroupReader) pageStore)
+             .getRowIndexOffset()
+             .orElseThrow(
+                 () ->
+@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+         Pair<int[], Integer> pair = buildRowIdMapping(vectors);
+         if (pair != null) {
+           int[] rowIdMapping = pair.first();
+-          numLiveRows = pair.second();
+-          for (int i = 0; i < vectors.length; i++) {
+-            vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
++          if (pair.second() != null) {
++            numLiveRows = pair.second();
++            for (int i = 0; i < vectors.length; i++) {
++              if (vectors[i] instanceof CometVector) {
++                vectors[i] =
++                    new CometSelectionVector((CometVector) vectors[i], 
rowIdMapping, numLiveRows);
++              } else {
++                throw new CometRuntimeException(
++                    "Unsupported column vector type: " + 
vectors[i].getClass());
++              }
++            }
+           }
+         }
+       }
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+index 047c96314b1..88d691a607a 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
+ import java.math.BigDecimal;
+ import java.nio.ByteBuffer;
+ import org.apache.comet.parquet.ConstantColumnReader;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.spark.sql.types.DataType;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -34,7 +35,11 @@ class CometConstantColumnReader<T> extends 
CometColumnReader {
+     super(field);
+     // use delegate to set constant value on the native side to be consumed 
by native execution.
+     setDelegate(
+-        new ConstantColumnReader(sparkType(), descriptor(), 
convertToSparkValue(value), false));
++        new ConstantColumnReader(
++            sparkType(),
++            CometTypeUtils.descriptorToParquetColumnSpec(descriptor()),
++            convertToSparkValue(value),
++            false));
+   }
+ 
+   @Override
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+index 6235bfe4865..cba108e4326 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+@@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader 
{
+     DeleteColumnReader() {
+       super(
+           DataTypes.BooleanType,
+-          TypeUtil.convertToParquet(
++          TypeUtil.convertToParquetSpec(
+               new StructField("_deleted", DataTypes.BooleanType, false, 
Metadata.empty())),
+           false /* useDecimal128 = false */,
+-          false /* isConstant = false */);
++          false /* isConstant */);
+       this.isDeleted = new boolean[0];
+     }
+ 
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+index bcc0e514c28..98e80068c51 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
+ 
+ import org.apache.comet.parquet.MetadataColumnReader;
+ import org.apache.comet.parquet.Native;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.parquet.column.ColumnDescriptor;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader {
+     PositionColumnReader(ColumnDescriptor descriptor) {
+       super(
+           DataTypes.LongType,
+-          descriptor,
++          CometTypeUtils.descriptorToParquetColumnSpec(descriptor),
+           false /* useDecimal128 = false */,
+           false /* isConstant = false */);
+     }
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+index d36f1a72747..56f8c9bff93 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends 
TypeWithSchemaVisitor<VectorizedReade
+       return null;
+     }
+ 
+-    return new 
CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc);
++    return new CometColumnReader(
++        SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId);
+   }
+ }
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+index a0f45e7610a..473b34bb0f3 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+@@ -111,6 +111,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
+         // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
+         .reuseContainers()
+         .withNameMapping(nameMapping())
++        .enableComet(parquetConf.readerType() == ParquetReaderType.COMET)
+         .build();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+index 0626d0b4398..9ec8f534669 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+@@ -175,11 +175,11 @@ class SparkBatch implements Batch {
+     return field.type().isPrimitiveType() || 
MetadataColumns.isMetadataColumn(field.fieldId());
+   }
+ 
+-  private boolean useCometBatchReads() {
++  protected boolean useCometBatchReads() {
+     return readConf.parquetVectorizationEnabled()
+         && readConf.parquetReaderType() == ParquetReaderType.COMET
+         && 
expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
+-        && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
++        && taskGroups.stream().allMatch(this::supportsCometBatchReads);
+   }
+ 
+   private boolean supportsCometBatchReads(Types.NestedField field) {
+@@ -189,6 +189,21 @@ class SparkBatch implements Batch {
+         && field.fieldId() != 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
+   }
+ 
++  private boolean supportsCometBatchReads(ScanTask task) {
++    if (task instanceof ScanTaskGroup) {
++      ScanTaskGroup<?> taskGroup = (ScanTaskGroup<?>) task;
++      return 
taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads);
++
++    } else if (task.isFileScanTask() && !task.isDataTask()) {
++      FileScanTask fileScanTask = task.asFileScanTask();
++      // Comet can't handle delete files for now
++      return fileScanTask.file().format() == FileFormat.PARQUET;
++
++    } else {
++      return false;
++    }
++  }
++
+   // conditions for using ORC batch reads:
+   // - ORC vectorization is enabled
+   // - all tasks are of type FileScanTask and read only ORC files with no 
delete files
+diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+index 106b296de09..967b0d41d08 100644
+--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
++++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+@@ -24,6 +24,7 @@ import java.util.Map;
+ import java.util.Optional;
+ import java.util.function.Supplier;
+ import java.util.stream.Collectors;
++import org.apache.comet.parquet.SupportsComet;
+ import org.apache.iceberg.BlobMetadata;
+ import org.apache.iceberg.ScanTask;
+ import org.apache.iceberg.ScanTaskGroup;
+@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+-abstract class SparkScan implements Scan, SupportsReportStatistics {
++abstract class SparkScan implements Scan, SupportsReportStatistics, 
SupportsComet {
+   private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
+   private static final String NDV_KEY = "ndv";
+ 
+@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
+       return splitSize;
+     }
+   }
++
++  @Override
++  public boolean isCometEnabled() {
++    SparkBatch batch = (SparkBatch) this.toBatch();
++    return batch.useCometBatchReads();
++  }
+ }
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+index 404ba728460..26d6f9b613f 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
+         .master("local[2]")
+         .config("spark.serializer", serializer)
+         .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++        .config("spark.plugins", "org.apache.spark.CometPlugin")
++        .config(
++            "spark.shuffle.manager",
++            
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++        .config("spark.comet.explainFallback.enabled", "true")
++        .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++        .config("spark.memory.offHeap.enabled", "true")
++        .config("spark.memory.offHeap.size", "10g")
++        .config("spark.comet.use.lazyMaterialization", "false")
++        .config("spark.comet.schemaEvolution.enabled", "true")
+         .getOrCreate();
+   }
+ }
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+index 659507e4c5e..eb9cedc34c5 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
+             .master("local[2]")
+             .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+             .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+index a218f965ea6..395c02441e7 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
+             .master("local[2]")
+             .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+             .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+index 2665d7ba8d3..306e859ce1a 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
+             .master("local[2]")
+             .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+             .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+index daf4e29ac07..fc9ee40c502 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+@@ -79,6 +79,17 @@ public abstract class TestBase extends SparkTestHelperBase {
+             .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+             .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
+             
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
++            .config("spark.comet.exec.broadcastExchange.enabled", "false")
+             .enableHiveSupport()
+             .getOrCreate();
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+index 973a17c9a38..dd0fd5cc9aa 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+@@ -65,6 +65,16 @@ public class TestParquetDictionaryEncodedVectorizedReads 
extends TestParquetVect
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+index 1c5905744a7..6db62e1f90d 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+@@ -61,6 +61,16 @@ public abstract class ScanTestBase extends AvroDataTestBase 
{
+     ScanTestBase.spark =
+         SparkSession.builder()
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .master("local[2]")
+             .getOrCreate();
+     ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+index 19ec6d13dd5..bf7c837cf38 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+@@ -144,7 +144,20 @@ public class TestCompressionSettings extends 
CatalogTestBase {
+ 
+   @BeforeAll
+   public static void startSpark() {
+-    TestCompressionSettings.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
++    TestCompressionSettings.spark =
++        SparkSession.builder()
++            .master("local[2]")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
++            .getOrCreate();
+   }
+ 
+   @BeforeEach
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+index a7702b169a6..bbb85f7d5c6 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+@@ -74,7 +74,20 @@ public class TestDataSourceOptions extends 
TestBaseWithCatalog {
+ 
+   @BeforeAll
+   public static void startSpark() {
+-    TestDataSourceOptions.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
++    TestDataSourceOptions.spark =
++        SparkSession.builder()
++            .master("local[2]")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
++            .getOrCreate();
+   }
+ 
+   @AfterAll
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+index fd7d52178f2..929ebd405c5 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+@@ -114,6 +114,16 @@ public class TestFilteredScan {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+index 153564f7d12..761c20f5d80 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+@@ -98,6 +98,16 @@ public class TestForwardCompatibility {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+index f4f57157e47..d1a7cc64179 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+@@ -51,6 +51,16 @@ public class TestIcebergSpark {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+index e1402396fa7..ca4212f52e6 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+@@ -118,6 +118,16 @@ public class TestPartitionPruning {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+     TestPartitionPruning.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+index 0b6ab2052b6..a8176332fb7 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+@@ -112,6 +112,16 @@ public class TestPartitionValues {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+index 11865db7fce..8fe32e8300c 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+@@ -91,6 +91,16 @@ public class TestSnapshotSelection {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+index 3051e27d720..6c39f76c286 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+@@ -125,6 +125,16 @@ public class TestSparkDataFile {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+     TestSparkDataFile.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+   }
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+index 4ccbf86f125..40cff1f69a7 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+@@ -100,6 +100,16 @@ public class TestSparkDataWrite {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+@@ -144,7 +154,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+     for (ManifestFile manifest :
+         SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) {
+@@ -213,7 +223,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+   }
+ 
+@@ -258,7 +268,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+   }
+ 
+@@ -310,7 +320,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+   }
+ 
+@@ -352,7 +362,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+   }
+ 
+@@ -391,7 +401,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ 
+     List<DataFile> files = Lists.newArrayList();
+@@ -459,7 +469,7 @@ public class TestSparkDataWrite {
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+ 
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+   }
+ 
+@@ -706,7 +716,7 @@ public class TestSparkDataWrite {
+     // Since write and commit succeeded, the rows should be readable
+     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
+     List<SimpleRecord> actual =
+-        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++        result.orderBy("id", 
"data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+     assertThat(actual)
+         .hasSize(records.size() + records2.size())
+         .containsExactlyInAnyOrder(
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+index 596d05d30b5..dc8563314c7 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+@@ -88,6 +88,16 @@ public class TestSparkReadProjection extends 
TestReadProjection {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+     ImmutableMap<String, String> config =
+         ImmutableMap.of(
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+index 42699f46623..058c2d79b62 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+@@ -138,6 +138,16 @@ public class TestSparkReaderDeletes extends 
DeleteReadTests {
+             .config("spark.ui.liveUpdate.period", 0)
+             .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+             .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .enableHiveSupport()
+             .getOrCreate();
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+index baf7fa8f88a..509c5deba51 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .enableHiveSupport()
+             .getOrCreate();
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+index 54048bbf218..b1a2ca92098 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+@@ -69,6 +69,16 @@ public class TestStructuredStreaming {
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
+             .config("spark.sql.shuffle.partitions", 4)
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+   }
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+index 8b1e3fbfc77..74936e2487e 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
+ 
+   @BeforeAll
+   public static void startSpark() {
+-    TestTimestampWithoutZone.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
++    TestTimestampWithoutZone.spark =
++        SparkSession.builder()
++            .master("local[2]")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
++            .getOrCreate();
+   }
+ 
+   @AfterAll
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+index c3fac70dd3f..b7f2431c119 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+@@ -84,6 +84,16 @@ public class TestWriteMetricsConfig {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.driver.host", 
InetAddress.getLoopbackAddress().getHostAddress())
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .getOrCreate();
+     TestWriteMetricsConfig.sc = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+   }
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+index 5ce56b4feca..0def2a156d4 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+@@ -63,6 +63,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
+         SparkSession.builder()
+             .master("local[2]")
+             .config("spark.sql.iceberg.aggregate_pushdown", "true")
++            .config("spark.plugins", "org.apache.spark.CometPlugin")
++            .config(
++                "spark.shuffle.manager",
++                
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++            .config("spark.comet.explainFallback.enabled", "true")
++            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
++            .config("spark.memory.offHeap.enabled", "true")
++            .config("spark.memory.offHeap.size", "10g")
++            .config("spark.comet.use.lazyMaterialization", "false")
++            .config("spark.comet.schemaEvolution.enabled", "true")
+             .enableHiveSupport()
+             .getOrCreate();
+ 
+diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+index 9d2ce2b388a..5e233688488 100644
+--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
++++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+@@ -598,9 +598,7 @@ public class TestFilterPushDown extends 
TestBaseWithCatalog {
+     String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
+ 
+     if (sparkFilter != null) {
+-      assertThat(planAsString)
+-          .as("Post scan filter should match")
+-          .contains("Filter (" + sparkFilter + ")");
++      assertThat(planAsString).as("Post scan filter should 
match").contains("CometFilter");
+     } else {
+       assertThat(planAsString).as("Should be no post scan 
filter").doesNotContain("Filter (");
+     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to