nsivabalan commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1247403964


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -125,18 +127,68 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expression involving [[source]] column(s), we will have to add "phony" 
column matching the
    * primary-key one of the target table.
    */
-  private lazy val primaryKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+  private lazy val recordKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+    val primaryKeyFields = hoodieCatalogTable.tableConfig.getRecordKeyFields
     val conditions = splitConjunctivePredicates(mergeInto.mergeCondition)
-    if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
-      throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement " +
-        s"(provided ${mergeInto.mergeCondition.sql}")
+    if (primaryKeyFields.isPresent) {
+      //pkless tables can have more complex conditions
+      if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {

Review Comment:
   shouldn't we also check for partition path fields for equality as well if 
part of the join condition ? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieNonIndex.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieNonIndex extends HoodieIndex<Object, Object> {

Review Comment:
   lets add java docs



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}
+
+class TestMergeIntoTable3 extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {

Review Comment:
   Whats the rational behind adding new test classes 
   TestMergeIntoTable2, TestMergeIntoTable3 etc. 
   can we atleast add some docs. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieNonIndex.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieNonIndex extends HoodieIndex<Object, Object> {
+
+  public HoodieNonIndex(HoodieWriteConfig config) {

Review Comment:
   Lets name this HoodieInternalProxyIndex. External users should not be able 
to use this index unless we want to support that flow.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java:
##########
@@ -43,6 +43,10 @@
  */
 public final class SparkHoodieIndexFactory {
   public static HoodieIndex createIndex(HoodieWriteConfig config) {
+    Boolean mergeIntoWrites = 
config.getProps().getBoolean("hoodie.internal.sql.merge.into.writes",false);

Review Comment:
   Lets define a constant for this internal config and re-use that instead of 
hard-coding it everywhere



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java:
##########
@@ -76,7 +76,7 @@ public class HoodieSparkKeyGeneratorFactory {
 
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
     String keyGeneratorClass = getKeyGeneratorClassName(props);
-    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
+    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props) 
&& !props.getBoolean("hoodie.internal.sql.merge.into.writes",false);

Review Comment:
   can we add java docs as to why we need to do this ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord.{RECORD_KEY_META_FIELD_ORD, 
PARTITION_PATH_META_FIELD_ORD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+class MergeIntoKeyGenerator(props: TypedProperties) extends 
SqlKeyGenerator(props) {

Review Comment:
   lets add java docs as to what this key gen is meant for. We should call out 
that this is not meant to be used by external users. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -243,27 +261,27 @@ object HoodieCreateRecordUtils {
 
   def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
Option[SparkKeyGeneratorInterface],
                                                   sourceRow: InternalRow, 
schema: StructType,
-                                                  isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+                                                  isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
     val recordKey = if (isPrepped) {
-      
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)

Review Comment:
   should we not fetch record key from meta field even for mergeIntoWrites flow 
as well. 
   or we gonna rely on the new key gen we are adding to fetch from meta field? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -46,16 +47,31 @@ import scala.collection.JavaConversions.mapAsJavaMap
 object HoodieCreateRecordUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def createHoodieRecordRdd(df: DataFrame,
-                            config: HoodieWriteConfig,
-                            parameters: Map[String, String],
-                            recordName: String,
-                            recordNameSpace: String,
-                            writerSchema: Schema,
-                            dataFileSchema: Schema,
-                            operation: WriteOperationType,
-                            instantTime: String,
-                            isPrepped: Boolean) = {
+  case class createHoodieRecordRddArgs(df: DataFrame,
+                                       config: HoodieWriteConfig,
+                                       parameters: Map[String, String],
+                                       recordName: String,
+                                       recordNameSpace: String,
+                                       writerSchema: Schema,
+                                       dataFileSchema: Schema,
+                                       operation: WriteOperationType,
+                                       instantTime: String,
+                                       isPrepped: Boolean,

Review Comment:
   if both isPrepped and mergedIntoWrites are used in conjunction everywhere, 
lets combine into one boolean flag. isPrepped should suffice 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -66,6 +82,8 @@ object HoodieCreateRecordUtils {
           HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
           HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()
         ).toBoolean
+    } else if (mergeIntoWrites && 
StringUtils.isNullOrEmpty(config.getString(PRECOMBINE_FIELD))) {

Review Comment:
   I thought we discussed to keep this true explicitly for merge Into flow. 



##########
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
ResolveLambdaVariables, UnresolvedAttribute, UnresolvedExtractValue, 
caseInsensitiveResolution, withPosition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+
+/**
+ * NOTE: Taken from HoodieSpark2Analysis and modified to resolve source and 
target tables if not already resolved
+ *
+ *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY 
NECESSARY
+ */
+object HoodieSpark30Analysis {

Review Comment:
   NTS: need to review all Analysis classes



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord.{RECORD_KEY_META_FIELD_ORD, 
PARTITION_PATH_META_FIELD_ORD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+class MergeIntoKeyGenerator(props: TypedProperties) extends 
SqlKeyGenerator(props) {
+
+  override def getRecordKey(record: GenericRecord): String = {
+    val recordKey = record.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(record)
+    }
+  }
+
+  override def getRecordKey(row: Row): String = {
+    val recordKey = row.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(row)

Review Comment:
   so incase of pk less, super.getRecordKey will generate new random keys right 
?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -48,18 +48,23 @@ object HoodieAnalysis extends SparkAdapterSupport {
     // TODO limit adapters to only Spark < 3.2
     val adaptIngestionTargetLogicalRelations: RuleBuilder = session => 
AdaptIngestionTargetLogicalRelations(session)
 
-    if (HoodieSparkUtils.isSpark2) {
-      val spark2ResolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
-      val spark2ResolveReferences: RuleBuilder =
-        session => ReflectionUtils.loadClass(spark2ResolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]
-
+    if (!HoodieSparkUtils.gteqSpark3_2) {

Review Comment:
   do we need to fix comments in L 45?
   also, lets add more java docs around all such changes. 1 year down the line, 
we ourselves might not remember why we had to do these changes. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -301,15 +335,20 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   def sourceDataset: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    val sourceTablePlan = mergeInto.sourceTable
+    val tablemetacols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+    val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")

Review Comment:
   can you help us understand where exactly this join was happening before this 
patch. Was it lazy before and we are making it eager now or are we doing 
something additional ?
   



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -128,9 +131,123 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
           catalogTable.location.toString))
         LogicalRelation(relation, catalogTable)
       }
+    case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
+      ////// don't want to go to the spark mit resolution so we resolve the 
source and target if they haven't been
+      if !mO.resolved =>
+      lazy val analyzer = spark.sessionState.analyzer
+      val targetTable = if (targetTableO.resolved) targetTableO else 
analyzer.execute(targetTableO)
+      val sourceTable = if (sourceTableO.resolved) sourceTableO else 
analyzer.execute(sourceTableO)
+      val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, 
sourceTable = sourceTable)
+      ///////
+      EliminateSubqueryAliases(targetTable) match {
+        case r: NamedRelation if r.skipSchemaResolution =>
+          // Do not resolve the expression if the target table accepts any 
schema.
+          // This allows data sources to customize their own resolution logic 
using
+          // custom resolution rules.
+          m
+
+        case _ =>
+          val newMatchedActions = m.matchedActions.map {
+            case DeleteAction(deleteCondition) =>
+              val resolvedDeleteCondition = deleteCondition.map(
+                resolveExpressionByPlanChildren(_, m))
+              DeleteAction(resolvedDeleteCondition)
+            case UpdateAction(updateCondition, assignments) =>
+              val resolvedUpdateCondition = updateCondition.map(
+                resolveExpressionByPlanChildren(_, m))
+              UpdateAction(
+                resolvedUpdateCondition,
+                // The update value can access columns from both target and 
source tables.
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= false))
+            case UpdateStarAction(updateCondition) =>
+              //Hudi change: filter out meta fields
+              val assignments = targetTable.output.filter(a => 
!isMetaField(a.name)).map { attr =>
+                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+              }
+              UpdateAction(
+                updateCondition.map(resolveExpressionByPlanChildren(_, m)),
+                // For UPDATE *, the value must from source table.
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case o => o
+          }
+          val newNotMatchedActions = m.notMatchedActions.map {
+            case InsertAction(insertCondition, assignments) =>
+              // The insert action is used when not matched, so its condition 
and value can only
+              // access columns from the source table.
+              val resolvedInsertCondition = insertCondition.map(
+                resolveExpressionByPlanChildren(_, Project(Nil, 
m.sourceTable)))
+              InsertAction(
+                resolvedInsertCondition,
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case InsertStarAction(insertCondition) =>
+              // The insert action is used when not matched, so its condition 
and value can only
+              // access columns from the source table.
+              val resolvedInsertCondition = insertCondition.map(
+                resolveExpressionByPlanChildren(_, Project(Nil, 
m.sourceTable)))
+              //Hudi change: filter out meta fields
+              val assignments = targetTable.output.filter(a => 
!isMetaField(a.name)).map { attr =>
+                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+              }
+              InsertAction(
+                resolvedInsertCondition,
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case o => o
+          }
+          val resolvedMergeCondition = 
resolveExpressionByPlanChildren(m.mergeCondition, m)
+          m.copy(mergeCondition = resolvedMergeCondition,
+            matchedActions = newMatchedActions,
+            notMatchedActions = newNotMatchedActions)
+      }
+  }
+
+  def resolveAssignments(
+                          assignments: Seq[Assignment],
+                          mergeInto: MergeIntoTable,
+                          resolveValuesWithSourceOnly: Boolean): 
Seq[Assignment] = {
+    assignments.map { assign =>
+      val resolvedKey = assign.key match {
+        case c if !c.resolved =>
+          resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
+        case o => o
+      }
+      val resolvedValue = assign.value match {
+        // The update values may contain target and/or source references.
+        case c if !c.resolved =>
+          if (resolveValuesWithSourceOnly) {
+            resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
+          } else {
+            resolveMergeExprOrFail(c, mergeInto)
+          }
+        case o => o
+      }
+      Assignment(resolvedKey, resolvedValue)
+    }
+  }
+
+  private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): 
Expression = {
+    try {
+      val resolved = resolveExpressionByPlanChildren(e, p)
+      resolved.references.filter(!_.resolved).foreach { a =>
+        // Note: This will throw error only on unresolved attribute issues,
+        // not other resolution errors like mismatched data types.
+        val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
+        sparkAdapter.failAnalysisForMIT(a, cols)

Review Comment:
   do we have any test case covering this. 
   



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -245,6 +275,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // TODO move to analysis phase
     validate
 
+    if (HoodieSparkUtils.isSpark2) {
+      sparkSession.conf.set("spark.sql.crossJoin.enabled","true")

Review Comment:
   can we have some java docs on this. and why specifically spark2.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -301,15 +335,20 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   def sourceDataset: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    val sourceTablePlan = mergeInto.sourceTable
+    val tablemetacols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+    val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+    val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
+    val sourceTablePlan = Project(tablemetacols ++ incomingDataCols, joinData)

Review Comment:
   can we rename this variable since it contains meta fields as well. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -348,7 +387,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expressions to the ExpressionPayload#getInsertValue.
    */
   private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, 
String]): Unit = {
-    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
+    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && 
updatingActions.isEmpty) {

Review Comment:
   does this mean that, we can have a MIT command just w/ non matching 
conditions and we will use insert operation then? 
   but this may not go thru the prepped code path right? 
   only upsert will go through the prepped code path. 
   It should not matter even if we go through insert code path(since everything 
is new record). just wanted to clarify. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -243,27 +261,27 @@ object HoodieCreateRecordUtils {
 
   def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
Option[SparkKeyGeneratorInterface],
                                                   sourceRow: InternalRow, 
schema: StructType,
-                                                  isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+                                                  isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
     val recordKey = if (isPrepped) {
-      
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)

Review Comment:
   can we add some docs as to why MIT is not using meta field code path here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to