alexeykudinkin commented on code in PR #7528: URL: https://github.com/apache/hudi/pull/7528#discussion_r1067596208
########## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala: ########## @@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils { * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if * B is a subset of A */ - def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = { - val attrs = from.toAttributes - val attrsMap = attrs.map(attr => (attr.name, attr)).toMap - val targetExprs = to.fields.map(f => attrsMap(f.name)) + def generateUnsafeProjection(sourceStructType: StructType, targetStructType: StructType): UnsafeProjection = { + val resolver = SQLConf.get.resolver + val attrs = sourceStructType.toAttributes + val targetExprs = targetStructType.fields.map { targetField => + val attrRef = attrs.find(attr => resolver(attr.name, targetField.name)) + .getOrElse(throw new AnalysisException(s"Wasn't able to match target field `${targetField.name}` to any of the source attributes ($attrs)")) + + genProjectingExpression(attrRef, targetField.dataType) + } GenerateUnsafeProjection.generate(targetExprs, attrs) } + private def genProjectingExpression(sourceExpr: Expression, + targetDataType: DataType): Expression = { + checkState(sourceExpr.resolved) + + // TODO support array, map + (sourceExpr.dataType, targetDataType) match { + case (sdt, tdt) if sdt == tdt => + sourceExpr + + case (sourceType: StructType, targetType: StructType) => + val fieldValueExprs = targetType.fields.map { tf => Review Comment: > Looks like a subset of nested fields may be taken during the projection, e.g., if the source has a {a.b, a.c, a.d} and the target has a.b, we only keep a.b instead of the whole StructType a. Does this happen or the caller of this function always makes sure the targetStructType is properly constructed to preserve the root-level field instead of a subset of nested fields? It does happen. It's actually the reason for this PR -- previously it was only handling nested field projections, but `NestedSchemaPruning` could produce schemas w/ _nested_ fields being pruned as well. Therefore, we need to make sure we handle this appropriately when reading log-files (by projecting records into the new schema) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala: ########## @@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { - val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) - val fileSplits = partitions.values.toSeq - .flatMap { files => - files.flatMap { file => - // TODO fix, currently assuming parquet as underlying format - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - partitionValues = getPartitionColumnsAsInternalRow(file) - ) - } - } + val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) Review Comment: We only using the base-file from the file-slice there's no functional change. Were you refer to something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org