[ https://issues.apache.org/jira/browse/HUDI-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380982#comment-17380982 ]
ASF GitHub Bot commented on HUDI-2139: -------------------------------------- pengzhiwei2018 commented on a change in pull request #3230: URL: https://github.com/apache/hudi/pull/3230#discussion_r670098428 ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java ########## @@ -189,8 +191,14 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) { private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) { Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata(); try { - Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, - config.getProps()); + // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge + // Whether it is a update or insert record. + boolean isUpdateRecord = isUpdateRecord(hoodieRecord); Review comment: Here I just pass the `isUpdateRecord` flag to the `ExpressionPayload`. So it can know current record is a matched record or not matched record. The matched record will execute the match-clause in merge-into, while the not-matched record will execute the not-match-clause. If we do not have such information, the result of merge into will be incorrect. ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala ########## @@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord, } } + /** + * Process the not-matched record. Test if the record matched any of insert-conditions, + * if matched then return the result of insert-assignment. Or else return a + * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * + * @param inputRecord The input record to process. + * @param properties The properties. + * @return The result of the record to insert. + */ + private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: Properties): HOption[IndexedRecord] = { + val insertConditionAndAssignmentsText = + properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS) + // Get the evaluator for each condition and insert assignment. + initWriteSchemaIfNeed(properties) + val insertConditionAndAssignments = + ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema) + var resultRecordOpt: HOption[IndexedRecord] = null + for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments + if resultRecordOpt == null) { + val conditionVal = evaluate(conditionEvaluator, inputRecord).head.asInstanceOf[Boolean] + // If matched the insert condition then execute the assignment expressions to compute the + // result record. We will return the first matched record. + if (conditionVal) { + val results = evaluate(assignmentEvaluator, inputRecord) + resultRecordOpt = HOption.of(convertToRecord(results, writeSchema)) + } + } + if (resultRecordOpt != null) { + resultRecordOpt + } else { + // If there is no condition matched, just filter this record. + // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it. + HOption.of(HoodieWriteHandle.IGNORE_RECORD) + } + } + override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = { val incomingRecord = bytesToAvro(recordBytes, schema) if (isDeleteRecord(incomingRecord)) { HOption.empty[IndexedRecord]() } else { - val insertConditionAndAssignmentsText = - properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS) - // Process insert val sqlTypedRecord = new SqlTypedRecord(incomingRecord) - // Get the evaluator for each condition and insert assignment. - initWriteSchemaIfNeed(properties) - val insertConditionAndAssignments = - ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema) - var resultRecordOpt: HOption[IndexedRecord] = null - for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments - if resultRecordOpt == null) { - val conditionVal = evaluate(conditionEvaluator, sqlTypedRecord).head.asInstanceOf[Boolean] - // If matched the insert condition then execute the assignment expressions to compute the - // result record. We will return the first matched record. - if (conditionVal) { - val results = evaluate(assignmentEvaluator, sqlTypedRecord) - resultRecordOpt = HOption.of(convertToRecord(results, writeSchema)) - } - } - - // Process delete for MOR - if (resultRecordOpt == null && isMORTable(properties)) { - val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) - if (deleteConditionText != null) { - val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1 - val deleteConditionVal = evaluate(deleteCondition, sqlTypedRecord).head.asInstanceOf[Boolean] - if (deleteConditionVal) { - resultRecordOpt = HOption.empty() - } + if (isMORTable(properties)) { Review comment: Has moved the logical to the `processMatchedRecord` and `processNotMatchedRecord` now. The `processMatchedRecord` process the matched record, while the `processNotMatchedRecord` process the not matched record. This will be more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MergeInto MOR Table May Result InCorrect Result > ----------------------------------------------- > > Key: HUDI-2139 > URL: https://issues.apache.org/jira/browse/HUDI-2139 > Project: Apache Hudi > Issue Type: Bug > Components: Spark Integration > Reporter: pengzhiwei > Assignee: pengzhiwei > Priority: Major > Labels: pull-request-available > Fix For: 0.9.0 > > > Currently we process all the update-action and inert-action in the > ExpressionPayload# > getInsertValue without know whether the record is matched or not matched for > MOR table. This may result in incorrect merge result. e.g. > {code:java} > Merge into h0 > using (select 2 as id, 'a1' as name, 10 as price from s) s0 > on h0.id = s0.id > when matched then s0.id = 1 the update set id = s0.id, name = s0.name, price > = 10 > when not matched then s0.id = 2 the insert (id,name,price) values(id,name, > 20){code} > If the id = 2 can matched the target table h0, but it cannot match the > udpate-condition ( s0.id = 1), It should not update the table. However, > currently we cannot know the matched state of the input record, it will goes > to the not-matched actions and update the price to 20 finally. This is > incorrect. -- This message was sent by Atlassian Jira (v8.3.4#803005)