This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 6da13c525b8c [HUDI-9671] Spark reads should skip record with
UPDATE_BEFORE operation (#13658)
6da13c525b8c is described below
commit 6da13c525b8ca6d7366f60d299ae375f75769945
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Aug 1 11:54:46 2025 +0800
[HUDI-9671] Spark reads should skip record with UPDATE_BEFORE operation
(#13658)
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 12 +++++++++--
.../apache/hudi/table/format/TestInputFormat.java | 23 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 4 ++++
.../src/main/scala/org/apache/hudi/Iterators.scala | 20 +++++++++----------
.../apache/hudi/TestDataSourceReadWithDeletes.java | 14 +++++++------
5 files changed, 55 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e993bd6dc145..f19701f81f37 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -269,7 +269,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
if (combineRecordOpt.isPresent()) {
if (oldRecord.getData() != combineRecordOpt.get().getData()) {
// the incoming record is chosen
- isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ isDelete = isDeleteRecord(newRecord);
} else {
// the incoming record is dropped
return false;
@@ -290,7 +290,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema,
Properties prop)
throws IOException {
- if (writeRecord(newRecord, Option.of(newRecord), schema, prop,
HoodieOperation.isDelete(newRecord.getOperation()))) {
+ if (writeRecord(newRecord, Option.of(newRecord), schema, prop,
isDeleteRecord(newRecord))) {
insertRecordsWritten++;
}
}
@@ -299,6 +299,14 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
return writeRecord(newRecord, combineRecord, schema, prop, false);
}
+ /**
+ * Check if a record is a DELETE/UPDATE_BEFORE marked by the
'_hoodie_operation' field.
+ */
+ private boolean isDeleteRecord(HoodieRecord<T> record) {
+ HoodieOperation operation = record.getOperation();
+ return HoodieOperation.isDelete(operation) ||
HoodieOperation.isUpdateBefore(operation);
+ }
+
private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord>
combineRecord, Schema schema, Properties prop, boolean isDelete) throws
IOException {
Option recordMetadata = newRecord.getMetadata();
if (!partitionPath.equals(newRecord.getPartitionPath())) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index fe6a53e0537d..855833ac696f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -991,6 +991,29 @@ public class TestInputFormat {
assertThat(baseResult, is(expected));
}
+ @Test
+ void testMergeRecordWithUpdateBefore() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ // write first batch with all insert data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ // write second batch with UPDATE_BEFORE record, send UPDATE_BEFORE for
`id1`
+ TestData.writeData(TestData.DATA_SET_UPDATE_BEFORE, conf);
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ final String baseResult = TestData.rowDataToString(readData(inputFormat));
+ String expected = "["
+ + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+ + "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+ + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+ + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+ + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
+ assertThat(baseResult, is(expected));
+ }
+
@Test
void testReadArchivedCommitsIncrementally() throws Exception {
Map<String, String> options = new HashMap<>();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b582c6293a98..30fed4d960e2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -167,6 +167,10 @@ public class TestData {
TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
}
+ public static List<RowData> DATA_SET_UPDATE_BEFORE = Arrays.asList(
+ updateBeforeRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
// data set of test_source.data
public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 4547569845cf..29de2a716cc2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -120,16 +120,16 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
}
}
- protected def isDeleteOperation(r: InternalRow): Boolean = if
(hasOperationField) {
- val operation = r.getString(operationFieldPos)
- HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+ protected def shouldSkip(r: InternalRow): Boolean = if (hasOperationField) {
+ val operation = HoodieOperation.fromName(r.getString(operationFieldPos))
+ HoodieOperation.isDelete(operation) ||
HoodieOperation.isUpdateBefore(operation)
} else {
false
}
- protected def isDeleteOperation(r: GenericRecord): Boolean = if
(hasOperationField) {
- val operation = r.get(operationFieldPos).toString
- HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+ protected def shouldSkip(r: GenericRecord): Boolean = if (hasOperationField)
{
+ val operation = HoodieOperation.fromName(r.get(operationFieldPos).toString)
+ HoodieOperation.isDelete(operation) ||
HoodieOperation.isUpdateBefore(operation)
} else {
false
}
@@ -161,7 +161,7 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
logRecordsIterator.next() match {
case Some(r: HoodieAvroIndexedRecord) =>
val data = r.getData.asInstanceOf[GenericRecord]
- if (isDeleteOperation(data)) {
+ if (shouldSkip(data)) {
this.hasNextInternal
} else {
val projectedAvroRecord = requiredSchemaAvroProjection(data)
@@ -170,7 +170,7 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
}
case Some(r: HoodieSparkRecord) =>
val data = r.getData
- if (isDeleteOperation(data)) {
+ if (shouldSkip(data)) {
this.hasNextInternal
} else {
nextRecord = requiredSchemaRowProjection(data)
@@ -310,7 +310,7 @@ class RecordMergingFileIterator(logFiles:
List[HoodieLogFile],
toScalaOption(result)
.flatMap { r =>
val data = r.getLeft.getData.asInstanceOf[InternalRow]
- if (isDeleteOperation(data)) {
+ if (shouldSkip(data)) {
None
} else {
val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
@@ -324,7 +324,7 @@ class RecordMergingFileIterator(logFiles:
List[HoodieLogFile],
toScalaOption(result)
.flatMap { r =>
val avroRecord = r.getLeft.toIndexedRecord(r.getRight,
payloadProps).get.getData.asInstanceOf[GenericRecord]
- if (isDeleteOperation(avroRecord)) {
+ if (shouldSkip(avroRecord)) {
None
} else {
Some(deserialize(requiredSchemaAvroProjection(avroRecord)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
index 62dfdeaf118c..05d366863da9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
@@ -47,7 +47,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.Properties;
@@ -87,8 +88,9 @@ public class TestDataSourceReadWithDeletes extends
SparkClientFunctionalTestHarn
schema = new Schema.Parser().parse(jsonSchema);
}
- @Test
- public void test() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = HoodieOperation.class, names = {"UPDATE_BEFORE",
"DELETE"})
+ public void test(HoodieOperation hoodieOperation) throws Exception {
HoodieWriteConfig config = createHoodieWriteConfig();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
@@ -100,7 +102,7 @@ public class TestDataSourceReadWithDeletes extends
SparkClientFunctionalTestHarn
String[] dataset2 = new String[] {
"I,id1,Danny,30,2,par1",
- "D,id2,Tony,20,2,par1",
+ hoodieOperation.getName() + ",id2,Tony,20,2,par1",
"I,id3,Julian,40,2,par1",
"D,id4,Stephan,35,2,par1"};
String insertTime2 = HoodieActiveTimeline.createNewInstantTime();
@@ -165,7 +167,7 @@ public class TestDataSourceReadWithDeletes extends
SparkClientFunctionalTestHarn
private List<HoodieRecord> str2HoodieRecord(String[] records) {
return Stream.of(records).map(rawRecordStr -> {
String[] parts = rawRecordStr.split(",");
- boolean isDelete = parts[0].equalsIgnoreCase("D");
+ String hoodieOperationStr = parts[0];
GenericRecord record = new GenericData.Record(schema);
record.put("id", parts[1]);
record.put("name", parts[2]);
@@ -176,7 +178,7 @@ public class TestDataSourceReadWithDeletes extends
SparkClientFunctionalTestHarn
return new HoodieAvroRecord<>(
new HoodieKey((String) record.get("id"), (String)
record.get("part")),
payload,
- isDelete ? HoodieOperation.DELETE : HoodieOperation.INSERT);
+ HoodieOperation.fromName(hoodieOperationStr));
}).collect(Collectors.toList());
}
}