nsivabalan commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1249752335


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -125,18 +127,68 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expression involving [[source]] column(s), we will have to add "phony" 
column matching the
    * primary-key one of the target table.
    */
-  private lazy val primaryKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+  private lazy val recordKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+    val primaryKeyFields = hoodieCatalogTable.tableConfig.getRecordKeyFields
     val conditions = splitConjunctivePredicates(mergeInto.mergeCondition)
-    if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
-      throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement " +
-        s"(provided ${mergeInto.mergeCondition.sql}")
+    if (primaryKeyFields.isPresent) {
+      //pkless tables can have more complex conditions
+      if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {

Review Comment:
   sg



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java:
##########
@@ -76,7 +76,7 @@ public class HoodieSparkKeyGeneratorFactory {
 
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
     String keyGeneratorClass = getKeyGeneratorClassName(props);
-    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
+    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props) 
&& !props.getBoolean("hoodie.internal.sql.merge.into.writes",false);

Review Comment:
   I don't see any docs ? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java:
##########
@@ -43,6 +43,10 @@
  */
 public final class SparkHoodieIndexFactory {
   public static HoodieIndex createIndex(HoodieWriteConfig config) {
+    Boolean mergeIntoWrites = 
config.getProps().getBoolean("hoodie.internal.sql.merge.into.writes",false);

Review Comment:
   I don't see this as resolved? 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -301,15 +335,20 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   def sourceDataset: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    val sourceTablePlan = mergeInto.sourceTable
+    val tablemetacols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+    val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")

Review Comment:
   sg. can we add docs on this as well. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala:
##########
@@ -884,10 +887,9 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
          """.stripMargin
           )
           checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a1", 10.1, 1000, "2021-03-21"),
             Seq(1, "a2", 10.2, 1002, "2021-03-21"),
-            Seq(3, "a3", 10.3, 1003, "2021-03-21"),
-            Seq(1, "a2", 10.2, 1002, "2021-03-21")

Review Comment:
   after 2nd MIT, for id 1, should we see just 2 records?
   after 1st MIT command, storage has two records for id 1. and in the 2nd MIT 
command, id 1 matches and so we might have to do update. So, the same two 
records on storage should get updated right? why do we see 3 records for id 1 ? 
can you throw some light please.
   



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -348,7 +387,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expressions to the ExpressionPayload#getInsertValue.
    */
   private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, 
String]): Unit = {
-    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
+    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && 
updatingActions.isEmpty) {

Review Comment:
   I see. 
   by this "The previous behavior was that only 1 additional copy would be 
inserted.", mean after 2nd MIT command as per master, we will send only 2 
records with id 1 and 3 to HoodieSparkSqlWriter. can you confirm ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to