alexeykudinkin commented on code in PR #7528:
URL: https://github.com/apache/hudi/pull/7528#discussion_r1067596208


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
    * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is 
only possible, if
    * B is a subset of A
    */
-  def generateUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
+  def generateUnsafeProjection(sourceStructType: StructType, targetStructType: 
StructType): UnsafeProjection = {
+    val resolver = SQLConf.get.resolver
+    val attrs = sourceStructType.toAttributes
+    val targetExprs = targetStructType.fields.map { targetField =>
+      val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+        .getOrElse(throw new AnalysisException(s"Wasn't able to match target 
field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+      genProjectingExpression(attrRef, targetField.dataType)
+    }
 
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  private def genProjectingExpression(sourceExpr: Expression,
+                                      targetDataType: DataType): Expression = {
+    checkState(sourceExpr.resolved)
+
+    // TODO support array, map
+    (sourceExpr.dataType, targetDataType) match {
+      case (sdt, tdt) if sdt == tdt =>
+        sourceExpr
+
+      case (sourceType: StructType, targetType: StructType) =>
+        val fieldValueExprs = targetType.fields.map { tf =>

Review Comment:
   > Looks like a subset of nested fields may be taken during the projection, 
e.g., if the source has a {a.b, a.c, a.d} and the target has a.b, we only keep 
a.b instead of the whole StructType a. Does this happen or the caller of this 
function always makes sure the targetStructType is properly constructed to 
preserve the root-level field instead of a subset of nested fields?
   
   It does happen. It's actually the reason for this PR -- previously it was 
only handling nested field projections, but `NestedSchemaPruning` could produce 
schemas w/ _nested_ fields being pruned as well. Therefore, we need to make 
sure we handle this appropriately when reading log-files (by projecting records 
into the new schema)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   }
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
-    val partitions = listLatestBaseFiles(globPaths, partitionFilters, 
dataFilters)
-    val fileSplits = partitions.values.toSeq
-      .flatMap { files =>
-        files.flatMap { file =>
-          // TODO fix, currently assuming parquet as underlying format
-          HoodieDataSourceHelper.splitFiles(
-            sparkSession = sparkSession,
-            file = file,
-            partitionValues = getPartitionColumnsAsInternalRow(file)
-          )
-        }
-      }
+    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, 
dataFilters)

Review Comment:
   We only using the base-file from the file-slice there's no functional 
change. 
   Were you refer to something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to