parthchandra commented on code in PR #2680:
URL: https://github.com/apache/datafusion-comet/pull/2680#discussion_r2519488523


##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -261,15 +356,15 @@ public void init() throws Throwable {
     ParquetReadOptions readOptions = builder.build();
 
     Map<String, String> objectStoreOptions =
-        asJava(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
+        asJava(NativeConfig.extractObjectStoreOptions(conf, pathUri));
 
     // TODO: enable off-heap buffer when they are ready
     ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
 
     Path path = new Path(new URI(filePath));
     try (FileReader fileReader =
         new FileReader(
-            CometInputFile.fromPath(path, conf), footer, readOptions, 
cometReadOptions, metrics)) {

Review Comment:
   Got removed accidentally. Thanks for catching this!



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -421,9 +515,40 @@ public void init() throws Throwable {
       CometFileKeyUnwrapper keyUnwrapper = null;
       if (encryptionEnabled) {
         keyUnwrapper = new CometFileKeyUnwrapper();
-        keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(), 
conf);
+        keyUnwrapper.storeDecryptionKeyRetriever(filePath, conf);
       }
 
+      // Filter out columns with preinitialized readers from sparkSchema 
before making the
+      // call to native
+      if (preInitializedReaders != null) {
+        StructType filteredSchema = new StructType();
+        StructField[] sparkFields = sparkSchema.fields();
+        // List<Type> fileFields = fileSchema.getFields();
+        for (int i = 0; i < sparkFields.length; i++) {
+          // Keep the column if:
+          // 1. It doesn't have a preinitialized reader, OR
+          // 2. It has a preinitialized reader but exists in fileSchema
+          boolean hasPreInitializedReader =
+              i < preInitializedReaders.length && preInitializedReaders[i] != 
null;
+          int finalI = i;
+          boolean existsInFileSchema =
+              fileFields.stream().anyMatch(f -> 
f.getName().equals(sparkFields[finalI].name()));

Review Comment:
   Yes, it should. Fixed



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -834,16 +987,34 @@ private int loadNextBatch() throws Throwable {
     importer = new CometSchemaImporter(ALLOCATOR);
 
     List<Type> fields = requestedSchema.getFields();
+    StructField[] sparkFields = sparkSchema.fields();
+
     for (int i = 0; i < fields.size(); i++) {
       if (!missingColumns[i]) {
         if (columnReaders[i] != null) columnReaders[i].close();
         // TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
-        DataType dataType = sparkSchema.fields()[i].dataType();
         Type field = fields.get(i);
+
+        // Find the corresponding spark field by matching field names
+        DataType dataType = null;
+        int sparkSchemaIndex = -1;
+        for (int j = 0; j < sparkFields.length; j++) {
+          if (sparkFields[j].name().equals(field.getName())) {

Review Comment:
   Done



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -648,13 +773,41 @@ private void checkParquetType(ParquetColumn column) 
throws IOException {
       }
     } else { // A missing column which is either primitive or complex
       if (column.required()) {
-        // Column is missing in data but the required data is non-nullable. 
This file is invalid.
-        throw new IOException(
-            "Required column is missing in data file. Col: " + 
Arrays.toString(path));
+        // check if we have a preinitialized column reader for this column.
+        int columnIndex = getColumnIndexFromParquetColumn(column);
+        if (columnIndex == -1
+            || preInitializedReaders == null
+            || preInitializedReaders[columnIndex] == null) {

Review Comment:
   Done



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -101,36 +102,87 @@
  * </pre>
  */
 public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> 
implements Closeable {
+
+  /**
+   * A class that contains the necessary file information for reading a 
Parquet file. This class
+   * provides an abstraction over PartitionedFile properties.
+   */
+  public static class FileInfo {
+    private final long start;
+    private final long length;
+    private final String filePath;
+    private final long fileSize;
+
+    public FileInfo(long start, long length, String filePath, long fileSize)
+        throws URISyntaxException {
+      this.start = start;
+      this.length = length;
+      URI uri = new Path(filePath).toUri();
+      if (uri.getScheme() == null) {
+        uri = new Path("file://" + filePath).toUri();
+      }
+      this.filePath = uri.toString();
+      this.fileSize = fileSize;
+    }
+
+    public static FileInfo fromPartitionedFile(PartitionedFile file) throws 
URISyntaxException {
+      return new FileInfo(file.start(), file.length(), 
file.filePath().toString(), file.fileSize());
+    }
+
+    public long start() {
+      return start;
+    }
+
+    public long length() {
+      return length;
+    }
+
+    public String filePath() {
+      return filePath;
+    }
+
+    public long fileSize() {
+      return fileSize;
+    }
+
+    public URI pathUri() throws Exception {

Review Comment:
   Done



##########
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as 
a Thrift encoded byte
+ * array . This allows Iceberg to pass metadata in serialized form with a 
two-step initialization
+ * pattern.
+ */
+public class IcebergCometNativeBatchReader extends NativeBatchReader {
+
+  public IcebergCometNativeBatchReader(StructType requiredSchema) {
+    super();
+    this.sparkSchema = requiredSchema;
+  }
+
+  /** Initialize the reader using FileInfo instead of PartitionedFile. */

Review Comment:
   This is not an override. The parent init method has a different signature.



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -364,23 +453,28 @@ public void init() throws Throwable {
           checkColumn(parquetFields[i]);
           missingColumns[i] = false;
         } else {
-          if (field.getRepetition() == Type.Repetition.REQUIRED) {
-            throw new IOException(
-                "Required column '"
-                    + field.getName()
-                    + "' is missing"
-                    + " in data file "
-                    + filePath);
-          }
-          if (field.isPrimitive()) {
-            ConstantColumnReader reader =
-                new ConstantColumnReader(nonPartitionFields[i], capacity, 
useDecimal128);
-            columnReaders[i] = reader;
+          if (preInitializedReaders != null && preInitializedReaders[i] != 
null) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to