[ 
https://issues.apache.org/jira/browse/HUDI-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380982#comment-17380982
 ] 

ASF GitHub Bot commented on HUDI-2139:
--------------------------------------

pengzhiwei2018 commented on a change in pull request #3230:
URL: https://github.com/apache/hudi/pull/3230#discussion_r670098428



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -189,8 +191,14 @@ protected boolean isUpdateRecord(HoodieRecord<T> 
hoodieRecord) {
   private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) 
{
     Option<Map<String, String>> recordMetadata = 
hoodieRecord.getData().getMetadata();
     try {
-      Option<IndexedRecord> avroRecord = 
hoodieRecord.getData().getInsertValue(tableSchema,
-          config.getProps());
+      // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
+      // Whether it is a update or insert record.
+      boolean isUpdateRecord = isUpdateRecord(hoodieRecord);

Review comment:
       Here I just pass the `isUpdateRecord` flag to the `ExpressionPayload`. 
So it can know current record is a matched record or not matched record. The 
matched record will execute the match-clause in merge-into, while the 
not-matched record will execute the not-match-clause. If we do not have such 
information, the result of merge into will be incorrect.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
##########
@@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord,
     }
   }
 
+  /**
+   * Process the not-matched record. Test if the record matched any of 
insert-conditions,
+   * if matched then return the result of insert-assignment. Or else return a
+   * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by 
HoodieWriteHandle.
+   *
+   * @param inputRecord The input record to process.
+   * @param properties  The properties.
+   * @return The result of the record to insert.
+   */
+  private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: 
Properties): HOption[IndexedRecord] = {
+    val insertConditionAndAssignmentsText =
+      
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
+    // Get the evaluator for each condition and insert assignment.
+    initWriteSchemaIfNeed(properties)
+    val insertConditionAndAssignments =
+      
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, 
writeSchema)
+    var resultRecordOpt: HOption[IndexedRecord] = null
+    for ((conditionEvaluator, assignmentEvaluator) <- 
insertConditionAndAssignments
+         if resultRecordOpt == null) {
+      val conditionVal = evaluate(conditionEvaluator, 
inputRecord).head.asInstanceOf[Boolean]
+      // If matched the insert condition then execute the assignment 
expressions to compute the
+      // result record. We will return the first matched record.
+      if (conditionVal) {
+        val results = evaluate(assignmentEvaluator, inputRecord)
+        resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
+      }
+    }
+    if (resultRecordOpt != null) {
+      resultRecordOpt
+    } else {
+      // If there is no condition matched, just filter this record.
+      // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
+      HOption.of(HoodieWriteHandle.IGNORE_RECORD)
+    }
+  }
+
   override def getInsertValue(schema: Schema, properties: Properties): 
HOption[IndexedRecord] = {
     val incomingRecord = bytesToAvro(recordBytes, schema)
     if (isDeleteRecord(incomingRecord)) {
       HOption.empty[IndexedRecord]()
     } else {
-      val insertConditionAndAssignmentsText =
-        
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
-      // Process insert
       val sqlTypedRecord = new SqlTypedRecord(incomingRecord)
-      // Get the evaluator for each condition and insert assignment.
-      initWriteSchemaIfNeed(properties)
-      val insertConditionAndAssignments =
-        
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, 
writeSchema)
-      var resultRecordOpt: HOption[IndexedRecord] = null
-      for ((conditionEvaluator, assignmentEvaluator) <- 
insertConditionAndAssignments
-           if resultRecordOpt == null) {
-        val conditionVal = evaluate(conditionEvaluator, 
sqlTypedRecord).head.asInstanceOf[Boolean]
-        // If matched the insert condition then execute the assignment 
expressions to compute the
-        // result record. We will return the first matched record.
-        if (conditionVal) {
-          val results = evaluate(assignmentEvaluator, sqlTypedRecord)
-          resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
-        }
-      }
-
-      // Process delete for MOR
-      if (resultRecordOpt == null && isMORTable(properties)) {
-        val deleteConditionText = 
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
-        if (deleteConditionText != null) {
-          val deleteCondition = getEvaluator(deleteConditionText.toString, 
writeSchema).head._1
-          val deleteConditionVal = evaluate(deleteCondition, 
sqlTypedRecord).head.asInstanceOf[Boolean]
-          if (deleteConditionVal) {
-            resultRecordOpt = HOption.empty()
-          }
+      if (isMORTable(properties)) {

Review comment:
       Has moved the logical to the `processMatchedRecord` and 
`processNotMatchedRecord` now. The `processMatchedRecord` process the matched 
record, while the `processNotMatchedRecord` process the not matched record. 
This will be more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MergeInto MOR Table May Result InCorrect Result
> -----------------------------------------------
>
>                 Key: HUDI-2139
>                 URL: https://issues.apache.org/jira/browse/HUDI-2139
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Spark Integration
>            Reporter: pengzhiwei
>            Assignee: pengzhiwei
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Currently we process all the update-action and inert-action in the 
> ExpressionPayload#
> getInsertValue without know whether the record is matched or not matched for 
> MOR table. This may result in incorrect merge result. e.g.
> {code:java}
> Merge into h0
> using (select 2 as id, 'a1' as name, 10 as price from s) s0
> on h0.id = s0.id
> when matched then s0.id = 1 the update set id = s0.id, name = s0.name, price 
> = 10
> when not matched then s0.id = 2 the insert (id,name,price) values(id,name, 
> 20){code}
> If the id = 2 can matched the target table h0,  but it cannot match the 
> udpate-condition ( s0.id = 1),  It should not update the table. However, 
> currently we cannot know the matched state of the input record, it will goes 
> to the not-matched actions and update the price to 20 finally. This is 
> incorrect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to