yihua commented on code in PR #7528:
URL: https://github.com/apache/hudi/pull/7528#discussion_r1067307358


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -138,10 +137,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   override protected def getPartitions: Array[Partition] =
     fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, 
file._1)).toArray
 
-  private def getConfig: Configuration = {
-    val conf = confBroadcast.value.value
-    CONFIG_INSTANTIATION_LOCK.synchronized {
-      new Configuration(conf)
-    }
+  private def getHadoopConf: Configuration = {
+    val conf = hadoopConfBroadcast.value.value
+    new Configuration(conf)

Review Comment:
   still need the lock for concurrency?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
    * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is 
only possible, if
    * B is a subset of A
    */
-  def generateUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
+  def generateUnsafeProjection(sourceStructType: StructType, targetStructType: 
StructType): UnsafeProjection = {
+    val resolver = SQLConf.get.resolver
+    val attrs = sourceStructType.toAttributes
+    val targetExprs = targetStructType.fields.map { targetField =>
+      val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+        .getOrElse(throw new AnalysisException(s"Wasn't able to match target 
field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+      genProjectingExpression(attrRef, targetField.dataType)
+    }
 
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  private def genProjectingExpression(sourceExpr: Expression,
+                                      targetDataType: DataType): Expression = {
+    checkState(sourceExpr.resolved)
+
+    // TODO support array, map
+    (sourceExpr.dataType, targetDataType) match {
+      case (sdt, tdt) if sdt == tdt =>
+        sourceExpr
+
+      case (sourceType: StructType, targetType: StructType) =>
+        val fieldValueExprs = targetType.fields.map { tf =>

Review Comment:
   Looks like a subset of nested fields may be taken during the projection, 
e.g., if the source has `a {a.b, a.c, a.d}` and the target has `a.b`, we only 
keep `a.b` instead of the whole `StructType a`.   Does this happen or the 
caller of this function always makes sure the `targetStructType` is properly 
constructed to preserve the root-level field instead of a subset of nested 
fields?  Is this a problem for projection, where the parquet and log reader can 
read files with a schema containing a subset of nested fields?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -67,6 +70,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  override def updatePrunedDataSchema(prunedSchema: StructType): Relation =

Review Comment:
   Does copying this class incur any noticeable overhead?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   }
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
-    val partitions = listLatestBaseFiles(globPaths, partitionFilters, 
dataFilters)
-    val fileSplits = partitions.values.toSeq
-      .flatMap { files =>
-        files.flatMap { file =>
-          // TODO fix, currently assuming parquet as underlying format
-          HoodieDataSourceHelper.splitFiles(
-            sparkSession = sparkSession,
-            file = file,
-            partitionValues = getPartitionColumnsAsInternalRow(file)
-          )
-        }
-      }
+    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, 
dataFilters)

Review Comment:
   This relation should still use `listLatestBaseFiles()` instead of 
`listLatestFileSlices`.  For MOR read-optimized query, only base files should 
be read, instead of the latest file slices.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
    * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is 
only possible, if
    * B is a subset of A
    */
-  def generateUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
+  def generateUnsafeProjection(sourceStructType: StructType, targetStructType: 
StructType): UnsafeProjection = {
+    val resolver = SQLConf.get.resolver
+    val attrs = sourceStructType.toAttributes
+    val targetExprs = targetStructType.fields.map { targetField =>
+      val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+        .getOrElse(throw new AnalysisException(s"Wasn't able to match target 
field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+      genProjectingExpression(attrRef, targetField.dataType)
+    }
 
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  private def genProjectingExpression(sourceExpr: Expression,
+                                      targetDataType: DataType): Expression = {
+    checkState(sourceExpr.resolved)
+
+    // TODO support array, map
+    (sourceExpr.dataType, targetDataType) match {
+      case (sdt, tdt) if sdt == tdt =>
+        sourceExpr
+
+      case (sourceType: StructType, targetType: StructType) =>
+        val fieldValueExprs = targetType.fields.map { tf =>
+          val ord = sourceType.fieldIndex(tf.name)
+          val fieldValExpr = 
genProjectingExpression(GetStructField(sourceExpr, ord, Some(tf.name)), 
tf.dataType)
+          Alias(fieldValExpr, tf.name)()
+        }
+
+        CreateStruct(fieldValueExprs)
+
+      case _ => throw new 
UnsupportedOperationException(s"(${sourceExpr.dataType}, $targetDataType)")
+    }
+  }
+
+  // TODO scala-docs

Review Comment:
   nit: add scala-docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to