This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 59f7d2806bf [HUDI-6562] Fixed issue for delete events for AWSDmsAvroPayload when CDC enabled (#9519) 59f7d2806bf is described below commit 59f7d2806bfc2d402dc8f5694dcb9d345e3d5a55 Author: Aditya Goenka <63430370+ad1happy...@users.noreply.github.com> AuthorDate: Fri Sep 1 04:47:48 2023 +0530 [HUDI-6562] Fixed issue for delete events for AWSDmsAvroPayload when CDC enabled (#9519) Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- .../hudi/io/HoodieMergeHandleWithChangeLog.java | 2 +- .../functional/cdc/TestCDCDataFrameSuite.scala | 56 +++++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java index d610891c2ca..f8669416f0c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java @@ -103,7 +103,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl // TODO Remove these unnecessary newInstance invocations HoodieRecord<T> savedRecord = newRecord.newInstance(); super.writeInsertRecord(newRecord); - if (!HoodieOperation.isDelete(newRecord.getOperation())) { + if (!HoodieOperation.isDelete(newRecord.getOperation()) && !savedRecord.isDelete(schema, config.getPayloadConfig().getProps())) { cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData)); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index 36629687106..aac836d8c3a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -26,7 +26,8 @@ import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplement import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource} @@ -634,4 +635,57 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val cdcDataOnly2 = cdcDataFrame((commitTime2.toLong - 1).toString) assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0) } + + @ParameterizedTest + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def testCDCWithAWSDMSPayload(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { + val options = Map( + "hoodie.table.name" -> "test", + "hoodie.datasource.write.recordkey.field" -> "id", + "hoodie.datasource.write.precombine.field" -> "replicadmstimestamp", + "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", + "hoodie.datasource.write.partitionpath.field" -> "", + "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.AWSDmsAvroPayload", + "hoodie.table.cdc.enabled" -> "true", + "hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after" + ) + + val data: Seq[(String, String, String, String)] = Seq( + ("1", "I", "2023-06-14 15:46:06.953746", "A"), + ("2", "I", "2023-06-14 15:46:07.953746", "B"), + ("3", "I", "2023-06-14 15:46:08.953746", "C") + ) + + val schema: StructType = StructType(Seq( + StructField("id", StringType), + StructField("Op", StringType), + StructField("replicadmstimestamp", StringType), + StructField("code", StringType) + )) + + val df = spark.createDataFrame(data.map(Row.fromTuple), schema) + df.write + .format("org.apache.hudi") + .option("hoodie.datasource.write.operation", "upsert") + .options(options) + .mode("append") + .save(basePath) + + assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 3) + + val newData: Seq[(String, String, String, String)] = Seq( + ("3", "D", "2023-06-14 15:47:09.953746", "B") + ) + + val newDf = spark.createDataFrame(newData.map(Row.fromTuple), schema) + + newDf.write + .format("org.apache.hudi") + .option("hoodie.datasource.write.operation", "upsert") + .options(options) + .mode("append") + .save(basePath) + + assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 2) + } }