This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59f7d2806bf [HUDI-6562] Fixed issue for delete events for 
AWSDmsAvroPayload when CDC enabled (#9519)
59f7d2806bf is described below

commit 59f7d2806bfc2d402dc8f5694dcb9d345e3d5a55
Author: Aditya Goenka <63430370+ad1happy...@users.noreply.github.com>
AuthorDate: Fri Sep 1 04:47:48 2023 +0530

    [HUDI-6562] Fixed issue for delete events for AWSDmsAvroPayload when CDC 
enabled (#9519)
    
    Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com>
---
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  2 +-
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 56 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index d610891c2ca..f8669416f0c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -103,7 +103,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
-    if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+    if (!HoodieOperation.isDelete(newRecord.getOperation()) && 
!savedRecord.isDelete(schema, config.getPayloadConfig().getProps())) {
       cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, 
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index 36629687106..aac836d8c3a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, 
HoodieCDCSupplement
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
@@ -634,4 +635,57 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val cdcDataOnly2 = cdcDataFrame((commitTime2.toLong - 1).toString)
     assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0)
   }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testCDCWithAWSDMSPayload(loggingMode: HoodieCDCSupplementalLoggingMode): 
Unit = {
+    val options = Map(
+      "hoodie.table.name" -> "test",
+      "hoodie.datasource.write.recordkey.field" -> "id",
+      "hoodie.datasource.write.precombine.field" -> "replicadmstimestamp",
+      "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+      "hoodie.datasource.write.partitionpath.field" -> "",
+      "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.model.AWSDmsAvroPayload",
+      "hoodie.table.cdc.enabled" -> "true",
+      "hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after"
+    )
+
+    val data: Seq[(String, String, String, String)] = Seq(
+      ("1", "I", "2023-06-14 15:46:06.953746", "A"),
+      ("2", "I", "2023-06-14 15:46:07.953746", "B"),
+      ("3", "I", "2023-06-14 15:46:08.953746", "C")
+    )
+
+    val schema: StructType = StructType(Seq(
+      StructField("id", StringType),
+      StructField("Op", StringType),
+      StructField("replicadmstimestamp", StringType),
+      StructField("code", StringType)
+    ))
+
+    val df = spark.createDataFrame(data.map(Row.fromTuple), schema)
+    df.write
+      .format("org.apache.hudi")
+      .option("hoodie.datasource.write.operation", "upsert")
+      .options(options)
+      .mode("append")
+      .save(basePath)
+
+    assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 
3)
+
+    val newData: Seq[(String, String, String, String)] = Seq(
+      ("3", "D", "2023-06-14 15:47:09.953746", "B")
+    )
+
+    val newDf = spark.createDataFrame(newData.map(Row.fromTuple), schema)
+
+    newDf.write
+      .format("org.apache.hudi")
+      .option("hoodie.datasource.write.operation", "upsert")
+      .options(options)
+      .mode("append")
+      .save(basePath)
+
+    assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 
2)
+  }
 }

Reply via email to