This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 6da13c525b8c [HUDI-9671] Spark reads should skip record with 
UPDATE_BEFORE operation (#13658)
6da13c525b8c is described below

commit 6da13c525b8ca6d7366f60d299ae375f75769945
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Aug 1 11:54:46 2025 +0800

    [HUDI-9671] Spark reads should skip record with UPDATE_BEFORE operation 
(#13658)
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 12 +++++++++--
 .../apache/hudi/table/format/TestInputFormat.java  | 23 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  4 ++++
 .../src/main/scala/org/apache/hudi/Iterators.scala | 20 +++++++++----------
 .../apache/hudi/TestDataSourceReadWithDeletes.java | 14 +++++++------
 5 files changed, 55 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e993bd6dc145..f19701f81f37 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -269,7 +269,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     if (combineRecordOpt.isPresent()) {
       if (oldRecord.getData() != combineRecordOpt.get().getData()) {
         // the incoming record is chosen
-        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+        isDelete = isDeleteRecord(newRecord);
       } else {
         // the incoming record is dropped
         return false;
@@ -290,7 +290,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop)
       throws IOException {
-    if (writeRecord(newRecord, Option.of(newRecord), schema, prop, 
HoodieOperation.isDelete(newRecord.getOperation()))) {
+    if (writeRecord(newRecord, Option.of(newRecord), schema, prop, 
isDeleteRecord(newRecord))) {
       insertRecordsWritten++;
     }
   }
@@ -299,6 +299,14 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     return writeRecord(newRecord, combineRecord, schema, prop, false);
   }
 
+  /**
+   * Check if a record is a DELETE/UPDATE_BEFORE marked by the 
'_hoodie_operation' field.
+   */
+  private boolean isDeleteRecord(HoodieRecord<T> record) {
+    HoodieOperation operation = record.getOperation();
+    return HoodieOperation.isDelete(operation) || 
HoodieOperation.isUpdateBefore(operation);
+  }
+
   private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> 
combineRecord, Schema schema, Properties prop, boolean isDelete) throws 
IOException {
     Option recordMetadata = newRecord.getMetadata();
     if (!partitionPath.equals(newRecord.getPartitionPath())) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index fe6a53e0537d..855833ac696f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -991,6 +991,29 @@ public class TestInputFormat {
     assertThat(baseResult, is(expected));
   }
 
+  @Test
+  void testMergeRecordWithUpdateBefore() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+    // write first batch with all insert data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    // write second batch with UPDATE_BEFORE record, send UPDATE_BEFORE for 
`id1`
+    TestData.writeData(TestData.DATA_SET_UPDATE_BEFORE, conf);
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    final String baseResult = TestData.rowDataToString(readData(inputFormat));
+    String expected = "["
+        + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+        + "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+        + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
+    assertThat(baseResult, is(expected));
+  }
+
   @Test
   void testReadArchivedCommitsIncrementally() throws Exception {
     Map<String, String> options = new HashMap<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b582c6293a98..30fed4d960e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -167,6 +167,10 @@ public class TestData {
             TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
   }
 
+  public static List<RowData> DATA_SET_UPDATE_BEFORE = Arrays.asList(
+      updateBeforeRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
   // data set of test_source.data
   public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 4547569845cf..29de2a716cc2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -120,16 +120,16 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
     }
   }
 
-  protected def isDeleteOperation(r: InternalRow): Boolean = if 
(hasOperationField) {
-    val operation = r.getString(operationFieldPos)
-    HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+  protected def shouldSkip(r: InternalRow): Boolean = if (hasOperationField) {
+    val operation = HoodieOperation.fromName(r.getString(operationFieldPos))
+    HoodieOperation.isDelete(operation) || 
HoodieOperation.isUpdateBefore(operation)
   } else {
     false
   }
 
-  protected def isDeleteOperation(r: GenericRecord): Boolean = if 
(hasOperationField) {
-    val operation = r.get(operationFieldPos).toString
-    HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+  protected def shouldSkip(r: GenericRecord): Boolean = if (hasOperationField) 
{
+    val operation = HoodieOperation.fromName(r.get(operationFieldPos).toString)
+    HoodieOperation.isDelete(operation) || 
HoodieOperation.isUpdateBefore(operation)
   } else {
     false
   }
@@ -161,7 +161,7 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
       logRecordsIterator.next() match {
         case Some(r: HoodieAvroIndexedRecord) =>
           val data = r.getData.asInstanceOf[GenericRecord]
-          if (isDeleteOperation(data)) {
+          if (shouldSkip(data)) {
             this.hasNextInternal
           } else {
             val projectedAvroRecord = requiredSchemaAvroProjection(data)
@@ -170,7 +170,7 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
           }
         case Some(r: HoodieSparkRecord) =>
           val data = r.getData
-          if (isDeleteOperation(data)) {
+          if (shouldSkip(data)) {
             this.hasNextInternal
           } else {
             nextRecord = requiredSchemaRowProjection(data)
@@ -310,7 +310,7 @@ class RecordMergingFileIterator(logFiles: 
List[HoodieLogFile],
         toScalaOption(result)
           .flatMap { r =>
             val data = r.getLeft.getData.asInstanceOf[InternalRow]
-            if (isDeleteOperation(data)) {
+            if (shouldSkip(data)) {
               None
             } else {
               val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
@@ -324,7 +324,7 @@ class RecordMergingFileIterator(logFiles: 
List[HoodieLogFile],
         toScalaOption(result)
           .flatMap { r =>
             val avroRecord = r.getLeft.toIndexedRecord(r.getRight, 
payloadProps).get.getData.asInstanceOf[GenericRecord]
-            if (isDeleteOperation(avroRecord)) {
+            if (shouldSkip(avroRecord)) {
               None
             } else {
               Some(deserialize(requiredSchemaAvroProjection(avroRecord)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
index 62dfdeaf118c..05d366863da9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
@@ -47,7 +47,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.List;
 import java.util.Properties;
@@ -87,8 +88,9 @@ public class TestDataSourceReadWithDeletes extends 
SparkClientFunctionalTestHarn
     schema = new Schema.Parser().parse(jsonSchema);
   }
 
-  @Test
-  public void test() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = HoodieOperation.class, names = {"UPDATE_BEFORE", 
"DELETE"})
+  public void test(HoodieOperation hoodieOperation) throws Exception {
     HoodieWriteConfig config = createHoodieWriteConfig();
     metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
 
@@ -100,7 +102,7 @@ public class TestDataSourceReadWithDeletes extends 
SparkClientFunctionalTestHarn
 
     String[] dataset2 = new String[] {
         "I,id1,Danny,30,2,par1",
-        "D,id2,Tony,20,2,par1",
+        hoodieOperation.getName() + ",id2,Tony,20,2,par1",
         "I,id3,Julian,40,2,par1",
         "D,id4,Stephan,35,2,par1"};
     String insertTime2 = HoodieActiveTimeline.createNewInstantTime();
@@ -165,7 +167,7 @@ public class TestDataSourceReadWithDeletes extends 
SparkClientFunctionalTestHarn
   private List<HoodieRecord> str2HoodieRecord(String[] records) {
     return Stream.of(records).map(rawRecordStr -> {
       String[] parts = rawRecordStr.split(",");
-      boolean isDelete = parts[0].equalsIgnoreCase("D");
+      String hoodieOperationStr = parts[0];
       GenericRecord record = new GenericData.Record(schema);
       record.put("id", parts[1]);
       record.put("name", parts[2]);
@@ -176,7 +178,7 @@ public class TestDataSourceReadWithDeletes extends 
SparkClientFunctionalTestHarn
       return new HoodieAvroRecord<>(
           new HoodieKey((String) record.get("id"), (String) 
record.get("part")),
           payload,
-          isDelete ? HoodieOperation.DELETE : HoodieOperation.INSERT);
+          HoodieOperation.fromName(hoodieOperationStr));
     }).collect(Collectors.toList());
   }
 }

Reply via email to