This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 21cbfce617 [HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR snapshot query after delete operations with test (#6688) 21cbfce617 is described below commit 21cbfce617689a1eff4d405bf5b19639d16e1c68 Author: Rahil C <32500120+rahi...@users.noreply.github.com> AuthorDate: Fri Sep 16 18:47:29 2022 -0700 [HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR snapshot query after delete operations with test (#6688) Co-authored-by: Rahil Chertara <rcher...@amazon.com> --- .../apache/hudi/common/model/AWSDmsAvroPayload.java | 9 ++++++--- .../hudi/common/model/TestAWSDmsAvroPayload.java | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java index 7153ea069d..fe044e0b43 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java @@ -49,7 +49,7 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { } public AWSDmsAvroPayload(Option<GenericRecord> record) { - this(record.get(), 0); // natural order + this(record.isPresent() ? record.get() : null, 0); // natural order } /** @@ -87,7 +87,10 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { @Override public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - IndexedRecord insertValue = super.getInsertValue(schema).get(); - return handleDeleteOperation(insertValue); + Option<IndexedRecord> insertValue = super.getInsertValue(schema); + if (!insertValue.isPresent()) { + return Option.empty(); + } + return handleDeleteOperation(insertValue.get()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java index a60f4ff6a7..07bc1d6f43 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java @@ -108,6 +108,27 @@ public class TestAWSDmsAvroPayload { } + @Test + public void testDeleteWithEmptyPayLoad() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + Properties properties = new Properties(); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 2); + oldRecord.put("Op", "U"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty()); + + try { + Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); + // expect nothing to be committed to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + e.printStackTrace(); + fail("Unexpected exception"); + } + } + @Test public void testPreCombineWithDelete() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);