This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 530ba7e18 fix: map parquet field_id correctly (native_iceberg_compat) 
(#1815)
530ba7e18 is described below

commit 530ba7e18d6f4a56fa25dfba22dc3e908425e323
Author: Parth Chandra <par...@apache.org>
AuthorDate: Wed Jun 11 17:59:22 2025 -0700

    fix: map parquet field_id correctly (native_iceberg_compat) (#1815)
    
    * fix: map parquet field_id correctly (native_iceberg_compat)
---
 .../apache/comet/parquet/NativeBatchReader.java    | 214 +++++++++++++++++++--
 1 file changed, 198 insertions(+), 16 deletions(-)

diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 51ba97279..4b4efd1f2 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Method;
 import java.net.URI;
 import java.nio.channels.Channels;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import scala.Option;
 import scala.collection.JavaConverters;
@@ -61,14 +62,14 @@ import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
 import org.apache.spark.sql.comet.util.Utils$;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
 import org.apache.spark.sql.execution.datasources.PartitionedFile;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetColumn;
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
 import org.apache.spark.sql.execution.metric.SQLMetric;
 import org.apache.spark.sql.internal.SQLConf;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.*;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.util.AccumulatorV2;
 
@@ -235,12 +236,6 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
    */
   public void init() throws Throwable {
 
-    conf.set("spark.sql.parquet.binaryAsString", "false");
-    conf.set("spark.sql.parquet.int96AsTimestamp", "false");
-    conf.set("spark.sql.caseSensitive", "false");
-    conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true");
-    conf.set("spark.sql.legacy.parquet.nanosAsLong", "false");
-
     useDecimal128 =
         conf.getBoolean(
             CometConf.COMET_USE_DECIMAL_128().key(),
@@ -268,9 +263,9 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
 
       requestedSchema = footer.getFileMetaData().getSchema();
       fileSchema = requestedSchema;
-      ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(conf);
 
       if (sparkSchema == null) {
+        ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(conf);
         sparkSchema = converter.convert(requestedSchema);
       } else {
         requestedSchema =
@@ -283,8 +278,18 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
                   sparkSchema.size(), requestedSchema.getFieldCount()));
         }
       }
-      this.parquetColumn =
-          converter.convertParquetColumn(requestedSchema, 
Option.apply(this.sparkSchema));
+
+      boolean caseSensitive =
+          conf.getBoolean(
+              SQLConf.CASE_SENSITIVE().key(),
+              (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get());
+      // rename spark fields based on field_id so name of spark schema field 
matches the parquet
+      // field name
+      if (useFieldId && ParquetUtils.hasFieldIds(sparkSchema)) {
+        sparkSchema =
+            getSparkSchemaByFieldId(sparkSchema, 
requestedSchema.asGroupType(), caseSensitive);
+      }
+      this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema);
 
       String timeZoneId = conf.get("spark.sql.session.timeZone");
       // Native code uses "UTC" always as the timeZoneId when converting from 
spark to arrow schema.
@@ -404,10 +409,6 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
           conf.getInt(
               CometConf.COMET_BATCH_SIZE().key(),
               (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
-      boolean caseSensitive =
-          conf.getBoolean(
-              SQLConf.CASE_SENSITIVE().key(),
-              (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get());
       this.handle =
           Native.initRecordBatchReader(
               filePath,
@@ -424,6 +425,187 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
     isInitialized = true;
   }
 
+  private ParquetColumn getParquetColumn(MessageType schema, StructType 
sparkSchema) {
+    // We use a different config from the config that is passed in.
+    // This follows the setting  used in Spark's 
SpecificParquetRecordReaderBase
+    Configuration config = new Configuration();
+    config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+    config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), 
false);
+    config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+    ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(config);
+    return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+  }
+
+  private Map<Integer, List<Type>> getIdToParquetFieldMap(GroupType type) {
+    return type.getFields().stream()
+        .filter(f -> f.getId() != null)
+        .collect(Collectors.groupingBy(f -> f.getId().intValue()));
+  }
+
+  private Map<String, List<Type>> getCaseSensitiveParquetFieldMap(GroupType 
schema) {
+    return schema.getFields().stream().collect(Collectors.toMap(Type::getName, 
Arrays::asList));
+  }
+
+  private Map<String, List<Type>> getCaseInsensitiveParquetFieldMap(GroupType 
schema) {
+    return schema.getFields().stream()
+        .collect(Collectors.groupingBy(f -> 
f.getName().toLowerCase(Locale.ROOT)));
+  }
+
+  private Type getMatchingParquetFieldById(
+      StructField f,
+      Map<Integer, List<Type>> idToParquetFieldMap,
+      Map<String, List<Type>> nameToParquetFieldMap,
+      boolean isCaseSensitive) {
+    List<Type> matched = null;
+    int fieldId = 0;
+    if (ParquetUtils.hasFieldId(f)) {
+      fieldId = ParquetUtils.getFieldId(f);
+      matched = idToParquetFieldMap.get(fieldId);
+    } else {
+      String fieldName = isCaseSensitive ? f.name() : 
f.name().toLowerCase(Locale.ROOT);
+      matched = nameToParquetFieldMap.get(fieldName);
+    }
+
+    if (matched == null || matched.isEmpty()) {
+      return null;
+    }
+    if (matched.size() > 1) {
+      // Need to fail if there is ambiguity, i.e. more than one field is 
matched
+      String parquetTypesString =
+          matched.stream().map(Type::getName).collect(Collectors.joining("[", 
", ", "]"));
+      throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+          fieldId, parquetTypesString);
+    } else {
+      return matched.get(0);
+    }
+  }
+
+  // Derived from CometParquetReadSupport.matchFieldId
+  private String getMatchingNameById(
+      StructField f,
+      Map<Integer, List<Type>> idToParquetFieldMap,
+      Map<String, List<Type>> nameToParquetFieldMap,
+      boolean isCaseSensitive) {
+    Type matched =
+        getMatchingParquetFieldById(f, idToParquetFieldMap, 
nameToParquetFieldMap, isCaseSensitive);
+
+    // When there is no ID match, we use a fake name to avoid a name match by 
accident
+    // We need this name to be unique as well, otherwise there will be type 
conflicts
+    if (matched == null) {
+      return CometParquetReadSupport.generateFakeColumnName();
+    } else {
+      return matched.getName();
+    }
+  }
+
+  // clip ParquetGroup Type
+  private StructType getSparkSchemaByFieldId(
+      StructType schema, GroupType parquetSchema, boolean caseSensitive) {
+    StructType newSchema = new StructType();
+    Map<Integer, List<Type>> idToParquetFieldMap = 
getIdToParquetFieldMap(parquetSchema);
+    Map<String, List<Type>> nameToParquetFieldMap =
+        caseSensitive
+            ? getCaseSensitiveParquetFieldMap(parquetSchema)
+            : getCaseInsensitiveParquetFieldMap(parquetSchema);
+    for (StructField f : schema.fields()) {
+      DataType newDataType;
+      String fieldName = isCaseSensitive ? f.name() : 
f.name().toLowerCase(Locale.ROOT);
+      List<Type> parquetFieldList = nameToParquetFieldMap.get(fieldName);
+      if (parquetFieldList == null) {
+        newDataType = f.dataType();
+      } else {
+        Type fieldType = parquetFieldList.get(0);
+        if (f.dataType() instanceof StructType) {
+          newDataType =
+              getSparkSchemaByFieldId(
+                  (StructType) f.dataType(), fieldType.asGroupType(), 
caseSensitive);
+        } else {
+          newDataType = getSparkTypeByFieldId(f.dataType(), fieldType, 
caseSensitive);
+        }
+      }
+      String matchedName =
+          getMatchingNameById(f, idToParquetFieldMap, nameToParquetFieldMap, 
isCaseSensitive);
+      StructField newField = f.copy(matchedName, newDataType, f.nullable(), 
f.metadata());
+      newSchema = newSchema.add(newField);
+    }
+    return newSchema;
+  }
+
+  private DataType getSparkTypeByFieldId(
+      DataType dataType, Type parquetType, boolean caseSensitive) {
+    DataType newDataType;
+    if (dataType instanceof StructType) {
+      newDataType =
+          getSparkSchemaByFieldId((StructType) dataType, 
parquetType.asGroupType(), caseSensitive);
+    } else if (dataType instanceof ArrayType) {
+
+      newDataType =
+          getSparkArrayTypeByFieldId(
+              (ArrayType) dataType, parquetType.asGroupType(), caseSensitive);
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType keyType = mapType.keyType();
+      DataType valueType = mapType.valueType();
+      DataType newKeyType;
+      DataType newValueType;
+      Type parquetMapType = parquetType.asGroupType().getFields().get(0);
+      Type parquetKeyType = parquetMapType.asGroupType().getType("key");
+      Type parquetValueType = parquetMapType.asGroupType().getType("value");
+      if (keyType instanceof StructType) {
+        newKeyType =
+            getSparkSchemaByFieldId(
+                (StructType) keyType, parquetKeyType.asGroupType(), 
caseSensitive);
+      } else {
+        newKeyType = keyType;
+      }
+      if (valueType instanceof StructType) {
+        newValueType =
+            getSparkSchemaByFieldId(
+                (StructType) valueType, parquetValueType.asGroupType(), 
caseSensitive);
+      } else {
+        newValueType = valueType;
+      }
+      newDataType = new MapType(newKeyType, newValueType, 
mapType.valueContainsNull());
+    } else {
+      newDataType = dataType;
+    }
+    return newDataType;
+  }
+
+  private DataType getSparkArrayTypeByFieldId(
+      ArrayType arrayType, GroupType parquetType, boolean caseSensitive) {
+    DataType newDataType;
+    DataType elementType = arrayType.elementType();
+    DataType newElementType;
+    Type parquetList = parquetType.getFields().get(0);
+    Type parquetElementType;
+    if (parquetList.getLogicalTypeAnnotation() == null
+        && parquetList.isRepetition(Type.Repetition.REPEATED)) {
+      parquetElementType = parquetList;
+    } else {
+      // we expect only non-primitive types here (see clipParquetListTypes for 
related logic)
+      GroupType repeatedGroup = 
parquetList.asGroupType().getType(0).asGroupType();
+      if (repeatedGroup.getFieldCount() > 1
+          || Objects.equals(repeatedGroup.getName(), "array")
+          || Objects.equals(repeatedGroup.getName(), parquetList.getName() + 
"_tuple")) {
+        parquetElementType = repeatedGroup;
+      } else {
+        parquetElementType = repeatedGroup.getType(0);
+      }
+    }
+    if (elementType instanceof StructType) {
+      newElementType =
+          getSparkSchemaByFieldId(
+              (StructType) elementType, parquetElementType.asGroupType(), 
caseSensitive);
+    } else {
+      newElementType = getSparkTypeByFieldId(elementType, parquetElementType, 
caseSensitive);
+    }
+    newDataType = new ArrayType(newElementType, arrayType.containsNull());
+    return newDataType;
+  }
+
   private void checkParquetType(ParquetColumn column) throws IOException {
     String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new 
String[0]);
     if (containsPath(fileSchema, path)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to