yihua commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1254962228


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -43,23 +43,29 @@ object HoodieAnalysis extends SparkAdapterSupport {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
     // NOTE: This rule adjusts [[LogicalRelation]]s resolving into Hudi tables 
such that
-    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark.
+    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark (Except in the
+    //       case of MergeInto. We leave the meta columns on the target table, 
and use other means to ensure resolution)
     //       For more details please check out the scala-doc of the rule
-    // TODO limit adapters to only Spark < 3.2
     val adaptIngestionTargetLogicalRelations: RuleBuilder = session => 
AdaptIngestionTargetLogicalRelations(session)
 
-    if (HoodieSparkUtils.isSpark2) {
-      val spark2ResolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
-      val spark2ResolveReferences: RuleBuilder =
-        session => ReflectionUtils.loadClass(spark2ResolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]
-
+    if (!HoodieSparkUtils.gteqSpark3_2) {
+      //Add or correct resolution of MergeInto
+      val resolveReferencesClass = if (HoodieSparkUtils.isSpark2) {
+        
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
+      } else if (HoodieSparkUtils.isSpark3_0) {
+        
"org.apache.spark.sql.catalyst.analysis.HoodieSpark30Analysis$ResolveReferences"
+      } else if (HoodieSparkUtils.isSpark3_1) {
+        
"org.apache.spark.sql.catalyst.analysis.HoodieSpark31Analysis$ResolveReferences"
+      } else {
+        throw new IllegalStateException("Impossible to be here")
+      }
+      val sparkResolveReferences: RuleBuilder =
+        session => ReflectionUtils.loadClass(resolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]

Review Comment:
   This only works for Spark 2.4.  We should use the following instead for 
Spark 3.x
   ```
   session => instantiateKlass(resolveReferencesClass, session)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to