This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 530ba7e18 fix: map parquet field_id correctly (native_iceberg_compat) (#1815) 530ba7e18 is described below commit 530ba7e18d6f4a56fa25dfba22dc3e908425e323 Author: Parth Chandra <par...@apache.org> AuthorDate: Wed Jun 11 17:59:22 2025 -0700 fix: map parquet field_id correctly (native_iceberg_compat) (#1815) * fix: map parquet field_id correctly (native_iceberg_compat) --- .../apache/comet/parquet/NativeBatchReader.java | 214 +++++++++++++++++++-- 1 file changed, 198 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 51ba97279..4b4efd1f2 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -27,6 +27,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.nio.channels.Channels; import java.util.*; +import java.util.stream.Collectors; import scala.Option; import scala.collection.JavaConverters; @@ -61,14 +62,14 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.comet.parquet.CometParquetReadSupport; import org.apache.spark.sql.comet.util.Utils$; +import org.apache.spark.sql.errors.QueryExecutionErrors; import org.apache.spark.sql.execution.datasources.PartitionedFile; import org.apache.spark.sql.execution.datasources.parquet.ParquetColumn; import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils; import org.apache.spark.sql.execution.metric.SQLMetric; import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.util.AccumulatorV2; @@ -235,12 +236,6 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme */ public void init() throws Throwable { - conf.set("spark.sql.parquet.binaryAsString", "false"); - conf.set("spark.sql.parquet.int96AsTimestamp", "false"); - conf.set("spark.sql.caseSensitive", "false"); - conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true"); - conf.set("spark.sql.legacy.parquet.nanosAsLong", "false"); - useDecimal128 = conf.getBoolean( CometConf.COMET_USE_DECIMAL_128().key(), @@ -268,9 +263,9 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme requestedSchema = footer.getFileMetaData().getSchema(); fileSchema = requestedSchema; - ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(conf); if (sparkSchema == null) { + ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(conf); sparkSchema = converter.convert(requestedSchema); } else { requestedSchema = @@ -283,8 +278,18 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme sparkSchema.size(), requestedSchema.getFieldCount())); } } - this.parquetColumn = - converter.convertParquetColumn(requestedSchema, Option.apply(this.sparkSchema)); + + boolean caseSensitive = + conf.getBoolean( + SQLConf.CASE_SENSITIVE().key(), + (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get()); + // rename spark fields based on field_id so name of spark schema field matches the parquet + // field name + if (useFieldId && ParquetUtils.hasFieldIds(sparkSchema)) { + sparkSchema = + getSparkSchemaByFieldId(sparkSchema, requestedSchema.asGroupType(), caseSensitive); + } + this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema); String timeZoneId = conf.get("spark.sql.session.timeZone"); // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema. @@ -404,10 +409,6 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme conf.getInt( CometConf.COMET_BATCH_SIZE().key(), (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get()); - boolean caseSensitive = - conf.getBoolean( - SQLConf.CASE_SENSITIVE().key(), - (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get()); this.handle = Native.initRecordBatchReader( filePath, @@ -424,6 +425,187 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme isInitialized = true; } + private ParquetColumn getParquetColumn(MessageType schema, StructType sparkSchema) { + // We use a different config from the config that is passed in. + // This follows the setting used in Spark's SpecificParquetRecordReaderBase + Configuration config = new Configuration(); + config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false); + config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); + config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); + config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); + ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(config); + return converter.convertParquetColumn(schema, Option.apply(sparkSchema)); + } + + private Map<Integer, List<Type>> getIdToParquetFieldMap(GroupType type) { + return type.getFields().stream() + .filter(f -> f.getId() != null) + .collect(Collectors.groupingBy(f -> f.getId().intValue())); + } + + private Map<String, List<Type>> getCaseSensitiveParquetFieldMap(GroupType schema) { + return schema.getFields().stream().collect(Collectors.toMap(Type::getName, Arrays::asList)); + } + + private Map<String, List<Type>> getCaseInsensitiveParquetFieldMap(GroupType schema) { + return schema.getFields().stream() + .collect(Collectors.groupingBy(f -> f.getName().toLowerCase(Locale.ROOT))); + } + + private Type getMatchingParquetFieldById( + StructField f, + Map<Integer, List<Type>> idToParquetFieldMap, + Map<String, List<Type>> nameToParquetFieldMap, + boolean isCaseSensitive) { + List<Type> matched = null; + int fieldId = 0; + if (ParquetUtils.hasFieldId(f)) { + fieldId = ParquetUtils.getFieldId(f); + matched = idToParquetFieldMap.get(fieldId); + } else { + String fieldName = isCaseSensitive ? f.name() : f.name().toLowerCase(Locale.ROOT); + matched = nameToParquetFieldMap.get(fieldName); + } + + if (matched == null || matched.isEmpty()) { + return null; + } + if (matched.size() > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched + String parquetTypesString = + matched.stream().map(Type::getName).collect(Collectors.joining("[", ", ", "]")); + throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + fieldId, parquetTypesString); + } else { + return matched.get(0); + } + } + + // Derived from CometParquetReadSupport.matchFieldId + private String getMatchingNameById( + StructField f, + Map<Integer, List<Type>> idToParquetFieldMap, + Map<String, List<Type>> nameToParquetFieldMap, + boolean isCaseSensitive) { + Type matched = + getMatchingParquetFieldById(f, idToParquetFieldMap, nameToParquetFieldMap, isCaseSensitive); + + // When there is no ID match, we use a fake name to avoid a name match by accident + // We need this name to be unique as well, otherwise there will be type conflicts + if (matched == null) { + return CometParquetReadSupport.generateFakeColumnName(); + } else { + return matched.getName(); + } + } + + // clip ParquetGroup Type + private StructType getSparkSchemaByFieldId( + StructType schema, GroupType parquetSchema, boolean caseSensitive) { + StructType newSchema = new StructType(); + Map<Integer, List<Type>> idToParquetFieldMap = getIdToParquetFieldMap(parquetSchema); + Map<String, List<Type>> nameToParquetFieldMap = + caseSensitive + ? getCaseSensitiveParquetFieldMap(parquetSchema) + : getCaseInsensitiveParquetFieldMap(parquetSchema); + for (StructField f : schema.fields()) { + DataType newDataType; + String fieldName = isCaseSensitive ? f.name() : f.name().toLowerCase(Locale.ROOT); + List<Type> parquetFieldList = nameToParquetFieldMap.get(fieldName); + if (parquetFieldList == null) { + newDataType = f.dataType(); + } else { + Type fieldType = parquetFieldList.get(0); + if (f.dataType() instanceof StructType) { + newDataType = + getSparkSchemaByFieldId( + (StructType) f.dataType(), fieldType.asGroupType(), caseSensitive); + } else { + newDataType = getSparkTypeByFieldId(f.dataType(), fieldType, caseSensitive); + } + } + String matchedName = + getMatchingNameById(f, idToParquetFieldMap, nameToParquetFieldMap, isCaseSensitive); + StructField newField = f.copy(matchedName, newDataType, f.nullable(), f.metadata()); + newSchema = newSchema.add(newField); + } + return newSchema; + } + + private DataType getSparkTypeByFieldId( + DataType dataType, Type parquetType, boolean caseSensitive) { + DataType newDataType; + if (dataType instanceof StructType) { + newDataType = + getSparkSchemaByFieldId((StructType) dataType, parquetType.asGroupType(), caseSensitive); + } else if (dataType instanceof ArrayType) { + + newDataType = + getSparkArrayTypeByFieldId( + (ArrayType) dataType, parquetType.asGroupType(), caseSensitive); + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + DataType keyType = mapType.keyType(); + DataType valueType = mapType.valueType(); + DataType newKeyType; + DataType newValueType; + Type parquetMapType = parquetType.asGroupType().getFields().get(0); + Type parquetKeyType = parquetMapType.asGroupType().getType("key"); + Type parquetValueType = parquetMapType.asGroupType().getType("value"); + if (keyType instanceof StructType) { + newKeyType = + getSparkSchemaByFieldId( + (StructType) keyType, parquetKeyType.asGroupType(), caseSensitive); + } else { + newKeyType = keyType; + } + if (valueType instanceof StructType) { + newValueType = + getSparkSchemaByFieldId( + (StructType) valueType, parquetValueType.asGroupType(), caseSensitive); + } else { + newValueType = valueType; + } + newDataType = new MapType(newKeyType, newValueType, mapType.valueContainsNull()); + } else { + newDataType = dataType; + } + return newDataType; + } + + private DataType getSparkArrayTypeByFieldId( + ArrayType arrayType, GroupType parquetType, boolean caseSensitive) { + DataType newDataType; + DataType elementType = arrayType.elementType(); + DataType newElementType; + Type parquetList = parquetType.getFields().get(0); + Type parquetElementType; + if (parquetList.getLogicalTypeAnnotation() == null + && parquetList.isRepetition(Type.Repetition.REPEATED)) { + parquetElementType = parquetList; + } else { + // we expect only non-primitive types here (see clipParquetListTypes for related logic) + GroupType repeatedGroup = parquetList.asGroupType().getType(0).asGroupType(); + if (repeatedGroup.getFieldCount() > 1 + || Objects.equals(repeatedGroup.getName(), "array") + || Objects.equals(repeatedGroup.getName(), parquetList.getName() + "_tuple")) { + parquetElementType = repeatedGroup; + } else { + parquetElementType = repeatedGroup.getType(0); + } + } + if (elementType instanceof StructType) { + newElementType = + getSparkSchemaByFieldId( + (StructType) elementType, parquetElementType.asGroupType(), caseSensitive); + } else { + newElementType = getSparkTypeByFieldId(elementType, parquetElementType, caseSensitive); + } + newDataType = new ArrayType(newElementType, arrayType.containsNull()); + return newDataType; + } + private void checkParquetType(ParquetColumn column) throws IOException { String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]); if (containsPath(fileSchema, path)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org