This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 35a99e002 chore: various refactoring changes for iceberg [iceberg] 
(#2680)
35a99e002 is described below

commit 35a99e002ba80cdc30067562a07bbd3f260456d3
Author: Parth Chandra <[email protected]>
AuthorDate: Thu Nov 13 09:31:28 2025 -0800

    chore: various refactoring changes for iceberg [iceberg] (#2680)
    
    * chore: various refactoring changes for iceberg
---
 common/pom.xml                                     |   4 +
 .../apache/comet/parquet/AbstractColumnReader.java |   4 +
 .../parquet/IcebergCometNativeBatchReader.java     |  85 ++++++
 .../apache/comet/parquet/NativeBatchReader.java    | 311 +++++++++++++++++----
 .../comet/parquet/ParquetMetadataSerializer.java   |  74 +++++
 native/core/Cargo.toml                             |   2 +-
 pom.xml                                            |  10 +
 7 files changed, 434 insertions(+), 56 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index 8ab0fe50f..9c0a7169a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -51,6 +51,10 @@ under the License.
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-format-structures</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-vector</artifactId>
diff --git 
a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
index b9f1797cb..3768bff56 100644
--- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
@@ -89,6 +89,10 @@ public abstract class AbstractColumnReader implements 
AutoCloseable {
     return descriptor;
   }
 
+  String getPath() {
+    return String.join(".", this.descriptor.getPath());
+  }
+
   /**
    * Set the batch size of this reader to be 'batchSize'. Also initializes the 
native column reader.
    */
diff --git 
a/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
 
b/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
new file mode 100644
index 000000000..7748fbbe2
--- /dev/null
+++ 
b/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as 
a Thrift encoded byte
+ * array . This allows Iceberg to pass metadata in serialized form with a 
two-step initialization
+ * pattern.
+ */
+public class IcebergCometNativeBatchReader extends NativeBatchReader {
+
+  public IcebergCometNativeBatchReader(StructType requiredSchema) {
+    super();
+    this.sparkSchema = requiredSchema;
+  }
+
+  /** Initialize the reader using FileInfo instead of PartitionedFile. */
+  public void init(
+      Configuration conf,
+      FileInfo fileInfo,
+      byte[] parquetMetadataBytes,
+      byte[] nativeFilter,
+      int capacity,
+      StructType dataSchema,
+      boolean isCaseSensitive,
+      boolean useFieldId,
+      boolean ignoreMissingIds,
+      boolean useLegacyDateTimestamp,
+      StructType partitionSchema,
+      InternalRow partitionValues,
+      AbstractColumnReader[] preInitializedReaders,
+      Map<String, SQLMetric> metrics)
+      throws Throwable {
+
+    // Set parent fields
+    this.conf = conf;
+    this.fileInfo = fileInfo;
+    this.footer = new 
ParquetMetadataSerializer().deserialize(parquetMetadataBytes);
+    this.nativeFilter = nativeFilter;
+    this.capacity = capacity;
+    this.dataSchema = dataSchema;
+    this.isCaseSensitive = isCaseSensitive;
+    this.useFieldId = useFieldId;
+    this.ignoreMissingIds = ignoreMissingIds;
+    this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+    this.partitionSchema = partitionSchema;
+    this.partitionValues = partitionValues;
+    this.preInitializedReaders = preInitializedReaders;
+    this.metrics.clear();
+    if (metrics != null) {
+      this.metrics.putAll(metrics);
+    }
+
+    // Call parent init method
+    super.init();
+  }
+
+  public StructType getSparkSchema() {
+    return this.sparkSchema;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 698dc53c1..d10a8932b 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.channels.Channels;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -101,39 +102,95 @@ import static 
scala.jdk.javaapi.CollectionConverters.asJava;
  * </pre>
  */
 public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> 
implements Closeable {
+
+  /**
+   * A class that contains the necessary file information for reading a 
Parquet file. This class
+   * provides an abstraction over PartitionedFile properties.
+   */
+  public static class FileInfo {
+    private final long start;
+    private final long length;
+    private final String filePath;
+    private final long fileSize;
+
+    public FileInfo(long start, long length, String filePath, long fileSize)
+        throws URISyntaxException {
+      this.start = start;
+      this.length = length;
+      URI uri = new Path(filePath).toUri();
+      if (uri.getScheme() == null) {
+        uri = new Path("file://" + filePath).toUri();
+      }
+      this.filePath = uri.toString();
+      this.fileSize = fileSize;
+    }
+
+    public static FileInfo fromPartitionedFile(PartitionedFile file) throws 
URISyntaxException {
+      return new FileInfo(file.start(), file.length(), 
file.filePath().toString(), file.fileSize());
+    }
+
+    public long start() {
+      return start;
+    }
+
+    public long length() {
+      return length;
+    }
+
+    public String filePath() {
+      return filePath;
+    }
+
+    public long fileSize() {
+      return fileSize;
+    }
+
+    public URI pathUri() throws URISyntaxException {
+      return new URI(filePath);
+    }
+  }
+
   private static final Logger LOG = 
LoggerFactory.getLogger(NativeBatchReader.class);
   protected static final BufferAllocator ALLOCATOR = new RootAllocator();
   private NativeUtil nativeUtil = new NativeUtil();
 
-  private Configuration conf;
-  private int capacity;
-  private boolean isCaseSensitive;
-  private boolean useFieldId;
-  private boolean ignoreMissingIds;
-  private StructType partitionSchema;
-  private InternalRow partitionValues;
-  private PartitionedFile file;
-  private final Map<String, SQLMetric> metrics;
+  protected Configuration conf;
+  protected int capacity;
+  protected boolean isCaseSensitive;
+  protected boolean useFieldId;
+  protected boolean ignoreMissingIds;
+  protected StructType partitionSchema;
+  protected InternalRow partitionValues;
+  protected PartitionedFile file;
+  protected FileInfo fileInfo;
+  protected final Map<String, SQLMetric> metrics;
   // Unfortunately CometMetricNode is from the "spark" package and cannot be 
used directly here
   // TODO: Move it to common package?
-  private Object metricsNode = null;
+  protected Object metricsNode = null;
 
-  private StructType sparkSchema;
-  private StructType dataSchema;
+  protected StructType sparkSchema;
+  protected StructType dataSchema;
   MessageType fileSchema;
-  private MessageType requestedSchema;
-  private CometVector[] vectors;
-  private AbstractColumnReader[] columnReaders;
-  private CometSchemaImporter importer;
-  private ColumnarBatch currentBatch;
+  protected MessageType requestedSchema;
+  protected CometVector[] vectors;
+  protected AbstractColumnReader[] columnReaders;
+  protected CometSchemaImporter importer;
+  protected ColumnarBatch currentBatch;
   //  private FileReader fileReader;
-  private boolean[] missingColumns;
-  private boolean isInitialized;
-  private ParquetMetadata footer;
-  private byte[] nativeFilter;
+  protected boolean[] missingColumns;
+  protected boolean isInitialized;
+  protected ParquetMetadata footer;
+  protected byte[] nativeFilter;
+  protected AbstractColumnReader[] preInitializedReaders;
 
   private ParquetColumn parquetColumn;
 
+  /**
+   * Map from field name to spark schema index for efficient lookups during 
batch loading. Built
+   * once during initialization and reused across all batch loads.
+   */
+  private Map<String, Integer> sparkFieldIndexMap;
+
   /**
    * Whether the native scan should always return decimal represented by 128 
bits, regardless of its
    * precision. Normally, this should be true if native execution is enabled, 
since Arrow compute
@@ -149,7 +206,7 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
    * seeing these dates/timestamps.
    */
   // TODO: (ARROW NATIVE)
-  private boolean useLegacyDateTimestamp;
+  protected boolean useLegacyDateTimestamp;
 
   /** The TaskContext object for executing this task. */
   private final TaskContext taskContext;
@@ -157,6 +214,12 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
   private long totalRowCount = 0;
   private long handle;
 
+  // Protected no-arg constructor for subclasses
+  protected NativeBatchReader() {
+    this.taskContext = TaskContext$.MODULE$.get();
+    this.metrics = new HashMap<>();
+  }
+
   // Only for testing
   public NativeBatchReader(String file, int capacity) {
     this(file, capacity, null, null);
@@ -237,6 +300,41 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
     this.taskContext = TaskContext$.MODULE$.get();
   }
 
+  /** Alternate constructor that accepts FileInfo instead of PartitionedFile. 
*/
+  NativeBatchReader(
+      Configuration conf,
+      FileInfo fileInfo,
+      ParquetMetadata footer,
+      byte[] nativeFilter,
+      int capacity,
+      StructType sparkSchema,
+      StructType dataSchema,
+      boolean isCaseSensitive,
+      boolean useFieldId,
+      boolean ignoreMissingIds,
+      boolean useLegacyDateTimestamp,
+      StructType partitionSchema,
+      InternalRow partitionValues,
+      Map<String, SQLMetric> metrics,
+      Object metricsNode) {
+    this.conf = conf;
+    this.capacity = capacity;
+    this.sparkSchema = sparkSchema;
+    this.dataSchema = dataSchema;
+    this.isCaseSensitive = isCaseSensitive;
+    this.useFieldId = useFieldId;
+    this.ignoreMissingIds = ignoreMissingIds;
+    this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+    this.partitionSchema = partitionSchema;
+    this.partitionValues = partitionValues;
+    this.fileInfo = fileInfo;
+    this.footer = footer;
+    this.nativeFilter = nativeFilter;
+    this.metrics = metrics;
+    this.metricsNode = metricsNode;
+    this.taskContext = TaskContext$.MODULE$.get();
+  }
+
   /**
    * Initialize this reader. The reason we don't do it in the constructor is 
that we want to close
    * any resource hold by this reader when error happens during the 
initialization.
@@ -248,10 +346,12 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
             CometConf.COMET_USE_DECIMAL_128().key(),
             (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
 
-    long start = file.start();
-    long length = file.length();
-    String filePath = file.filePath().toString();
-    long fileSize = file.fileSize();
+    // Use fileInfo if available, otherwise fall back to file
+    long start = fileInfo != null ? fileInfo.start() : file.start();
+    long length = fileInfo != null ? fileInfo.length() : file.length();
+    String filePath = fileInfo != null ? fileInfo.filePath() : 
file.filePath().toString();
+    long fileSize = fileInfo != null ? fileInfo.fileSize() : file.fileSize();
+    URI pathUri = fileInfo != null ? fileInfo.pathUri() : file.pathUri();
 
     ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new 
Path(filePath));
 
@@ -261,7 +361,7 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
     ParquetReadOptions readOptions = builder.build();
 
     Map<String, String> objectStoreOptions =
-        asJava(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
+        asJava(NativeConfig.extractObjectStoreOptions(conf, pathUri));
 
     // TODO: enable off-heap buffer when they are ready
     ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
@@ -299,14 +399,8 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
         sparkSchema =
             getSparkSchemaByFieldId(sparkSchema, 
requestedSchema.asGroupType(), caseSensitive);
       }
-      this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema);
 
-      String timeZoneId = conf.get("spark.sql.session.timeZone");
-      // Native code uses "UTC" always as the timeZoneId when converting from 
spark to arrow schema.
-      Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
-      byte[] serializedRequestedArrowSchema = 
serializeArrowSchema(arrowSchema);
-      Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
-      byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
+      this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema);
 
       // Create Column readers
       List<Type> fields = requestedSchema.getFields();
@@ -364,23 +458,30 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
           checkColumn(parquetFields[i]);
           missingColumns[i] = false;
         } else {
-          if (field.getRepetition() == Type.Repetition.REQUIRED) {
-            throw new IOException(
-                "Required column '"
-                    + field.getName()
-                    + "' is missing"
-                    + " in data file "
-                    + filePath);
-          }
-          if (field.isPrimitive()) {
-            ConstantColumnReader reader =
-                new ConstantColumnReader(nonPartitionFields[i], capacity, 
useDecimal128);
-            columnReaders[i] = reader;
+          if (preInitializedReaders != null
+              && i < preInitializedReaders.length
+              && preInitializedReaders[i] != null) {
+            columnReaders[i] = preInitializedReaders[i];
             missingColumns[i] = true;
           } else {
-            // the column requested is not in the file, but the native reader 
can handle that
-            // and will return nulls for all rows requested
-            missingColumns[i] = false;
+            if (field.getRepetition() == Type.Repetition.REQUIRED) {
+              throw new IOException(
+                  "Required column '"
+                      + field.getName()
+                      + "' is missing"
+                      + " in data file "
+                      + filePath);
+            }
+            if (field.isPrimitive()) {
+              ConstantColumnReader reader =
+                  new ConstantColumnReader(nonPartitionFields[i], capacity, 
useDecimal128);
+              columnReaders[i] = reader;
+              missingColumns[i] = true;
+            } else {
+              // the column requested is not in the file, but the native 
reader can handle that
+              // and will return nulls for all rows requested
+              missingColumns[i] = false;
+            }
           }
         }
       }
@@ -421,9 +522,44 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
       CometFileKeyUnwrapper keyUnwrapper = null;
       if (encryptionEnabled) {
         keyUnwrapper = new CometFileKeyUnwrapper();
-        keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(), 
conf);
+        keyUnwrapper.storeDecryptionKeyRetriever(filePath, conf);
+      }
+
+      // Filter out columns with preinitialized readers from sparkSchema 
before making the
+      // call to native
+      if (preInitializedReaders != null) {
+        StructType filteredSchema = new StructType();
+        StructField[] sparkFields = sparkSchema.fields();
+        // Build name map for efficient lookups
+        Map<String, Type> fileFieldNameMap =
+            caseSensitive
+                ? buildCaseSensitiveNameMap(fileFields)
+                : buildCaseInsensitiveNameMap(fileFields);
+        for (int i = 0; i < sparkFields.length; i++) {
+          // Keep the column if:
+          // 1. It doesn't have a preinitialized reader, OR
+          // 2. It has a preinitialized reader but exists in fileSchema
+          boolean hasPreInitializedReader =
+              i < preInitializedReaders.length && preInitializedReaders[i] != 
null;
+          String fieldName =
+              caseSensitive
+                  ? sparkFields[i].name()
+                  : sparkFields[i].name().toLowerCase(Locale.ROOT);
+          boolean existsInFileSchema = fileFieldNameMap.containsKey(fieldName);
+          if (!hasPreInitializedReader || existsInFileSchema) {
+            filteredSchema = filteredSchema.add(sparkFields[i]);
+          }
+        }
+        sparkSchema = filteredSchema;
       }
 
+      // Native code uses "UTC" always as the timeZoneId when converting from 
spark to arrow schema.
+      String timeZoneId = "UTC";
+      Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, 
timeZoneId);
+      byte[] serializedRequestedArrowSchema = 
serializeArrowSchema(arrowSchema);
+      Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, 
timeZoneId);
+      byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
+
       int batchSize =
           conf.getInt(
               CometConf.COMET_BATCH_SIZE().key(),
@@ -443,6 +579,15 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
               objectStoreOptions,
               keyUnwrapper,
               metricsNode);
+
+      // Build spark field index map for efficient lookups during batch loading
+      StructField[] sparkFields = sparkSchema.fields();
+      sparkFieldIndexMap = new HashMap<>();
+      for (int j = 0; j < sparkFields.length; j++) {
+        String fieldName =
+            caseSensitive ? sparkFields[j].name() : 
sparkFields[j].name().toLowerCase(Locale.ROOT);
+        sparkFieldIndexMap.put(fieldName, j);
+      }
     }
     isInitialized = true;
   }
@@ -475,6 +620,15 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
         .collect(Collectors.groupingBy(f -> 
f.getName().toLowerCase(Locale.ROOT)));
   }
 
+  private Map<String, Type> buildCaseSensitiveNameMap(List<Type> types) {
+    return types.stream().collect(Collectors.toMap(Type::getName, t -> t));
+  }
+
+  private Map<String, Type> buildCaseInsensitiveNameMap(List<Type> types) {
+    return types.stream()
+        .collect(Collectors.toMap(t -> t.getName().toLowerCase(Locale.ROOT), t 
-> t));
+  }
+
   private Type getMatchingParquetFieldById(
       StructField f,
       Map<Integer, List<Type>> idToParquetFieldMap,
@@ -648,11 +802,40 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
       }
     } else { // A missing column which is either primitive or complex
       if (column.required()) {
-        // Column is missing in data but the required data is non-nullable. 
This file is invalid.
-        throw new IOException(
-            "Required column is missing in data file. Col: " + 
Arrays.toString(path));
+        // check if we have a preinitialized column reader for this column.
+        int columnIndex = getColumnIndexFromParquetColumn(column);
+        if (columnIndex == -1
+            || preInitializedReaders == null
+            || columnIndex >= preInitializedReaders.length
+            || preInitializedReaders[columnIndex] == null) {
+          // Column is missing in data but the required data is non-nullable. 
This file is invalid.
+          throw new IOException(
+              "Required column is missing in data file. Col: " + 
Arrays.toString(path));
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the column index in the requested schema for a given ParquetColumn. 
Returns -1 if not
+   * found.
+   */
+  private int getColumnIndexFromParquetColumn(ParquetColumn column) {
+    String[] targetPath = asJava(column.path()).toArray(new String[0]);
+    if (targetPath.length == 0) {
+      return -1;
+    }
+
+    // For top-level columns, match by name
+    String columnName = targetPath[0];
+    ParquetColumn[] parquetFields = 
asJava(parquetColumn.children()).toArray(new ParquetColumn[0]);
+    for (int i = 0; i < parquetFields.length; i++) {
+      String[] fieldPath = asJava(parquetFields[i].path()).toArray(new 
String[0]);
+      if (fieldPath.length > 0 && fieldPath[0].equals(columnName)) {
+        return i;
       }
     }
+    return -1;
   }
 
   /**
@@ -834,16 +1017,34 @@ public class NativeBatchReader extends 
RecordReader<Void, ColumnarBatch> impleme
     importer = new CometSchemaImporter(ALLOCATOR);
 
     List<Type> fields = requestedSchema.getFields();
+    StructField[] sparkFields = sparkSchema.fields();
+
+    boolean caseSensitive =
+        conf.getBoolean(
+            SQLConf.CASE_SENSITIVE().key(),
+            (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get());
+
     for (int i = 0; i < fields.size(); i++) {
       if (!missingColumns[i]) {
         if (columnReaders[i] != null) columnReaders[i].close();
         // TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
-        DataType dataType = sparkSchema.fields()[i].dataType();
         Type field = fields.get(i);
+
+        // Find the corresponding spark field by matching field names using 
the prebuilt map
+        String fieldName =
+            caseSensitive ? field.getName() : 
field.getName().toLowerCase(Locale.ROOT);
+        Integer sparkSchemaIndex = sparkFieldIndexMap.get(fieldName);
+
+        if (sparkSchemaIndex == null) {
+          throw new IOException(
+              "Could not find matching Spark field for Parquet field: " + 
field.getName());
+        }
+
+        DataType dataType = sparkFields[sparkSchemaIndex].dataType();
         NativeColumnReader reader =
             new NativeColumnReader(
                 this.handle,
-                i,
+                sparkSchemaIndex,
                 dataType,
                 field,
                 null,
diff --git 
a/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java 
b/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java
new file mode 100644
index 000000000..32b40940a
--- /dev/null
+++ 
b/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Utility class for serializing and deserializing ParquetMetadata instances 
to/from byte arrays.
+ * This uses the Parquet format's FileMetaData structure and the underlying 
Thrift compact protocol
+ * for serialization.
+ */
+public class ParquetMetadataSerializer {
+
+  private final ParquetMetadataConverter converter;
+
+  public ParquetMetadataSerializer() {
+    this.converter = new ParquetMetadataConverter();
+  }
+
+  public ParquetMetadataSerializer(ParquetMetadataConverter converter) {
+    this.converter = converter;
+  }
+
+  /**
+   * Serializes a ParquetMetadata instance to a byte array.
+   *
+   * @param metadata the ParquetMetadata to serialize
+   * @return the serialized byte array
+   * @throws IOException if an error occurs during serialization
+   */
+  public byte[] serialize(ParquetMetadata metadata) throws IOException {
+    FileMetaData fileMetaData = converter.toParquetMetadata(1, metadata);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Util.writeFileMetaData(fileMetaData, outputStream);
+    return outputStream.toByteArray();
+  }
+
+  /**
+   * Deserializes a byte array back into a ParquetMetadata instance.
+   *
+   * @param bytes the serialized byte array
+   * @return the deserialized ParquetMetadata
+   * @throws IOException if an error occurs during deserialization
+   */
+  public ParquetMetadata deserialize(byte[] bytes) throws IOException {
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+    FileMetaData fileMetaData = Util.readFileMetaData(inputStream);
+    return converter.fromParquetMetadata(fileMetaData);
+  }
+}
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 92af3e238..c9f95a1d8 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -77,7 +77,7 @@ reqwest = { version = "0.12", default-features = false, 
features = ["rustls-tls-
 object_store_opendal = {version = "0.54.0", optional = true}
 hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
 opendal = { version ="0.54.1", optional = true, features = ["services-hdfs"] }
-uuid = "1.0"
+uuid = "1.18.1"
 
 [target.'cfg(target_os = "linux")'.dependencies]
 procfs = "0.18.0"
diff --git a/pom.xml b/pom.xml
index 6a6254dfc..d15154fe6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,10 @@ under the License.
             <groupId>org.apache.parquet</groupId>
             <artifactId>parquet-column</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-format-structures</artifactId>
+          </exclusion>
           <exclusion>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
@@ -218,6 +222,12 @@ under the License.
         <version>${parquet.version}</version>
         <scope>${parquet.maven.scope}</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-format-structures</artifactId>
+        <version>${parquet.version}</version>
+        <scope>${parquet.maven.scope}</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.parquet</groupId>
         <artifactId>parquet-hadoop</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to