This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f70678f435 [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689) f70678f435 is described below commit f70678f4354c6264b6a1e38900dd7a11cb345b96 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Fri Sep 16 13:56:23 2022 +0800 [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689) --- .../OverwriteNonDefaultsWithLatestAvroPayload.java | 2 +- ...tOverwriteNonDefaultsWithLatestAvroPayload.java | 59 ++++++++++++++++++++-- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 6ce99aae21..9ce241bc78 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -70,7 +70,7 @@ public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLate if (!overwriteField(value, defaultValue)) { builder.set(field, value); } else { - builder.set(field, currentRecord.get(field.pos())); + builder.set(field, currentRecord.get(field.name())); } }); return Option.of(builder.build()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index 9e3405b304..0807b41f61 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -23,6 +23,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.avro.HoodieAvroUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,13 +55,15 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { @Test public void testActiveRecords() throws IOException { + Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema); + GenericRecord record1 = new GenericData.Record(schema); record1.put("id", "1"); record1.put("partition", "partition1"); record1.put("ts", 0L); record1.put("_hoodie_is_deleted", false); record1.put("city", "NY0"); - record1.put("child", Arrays.asList("A")); + record1.put("child", Collections.singletonList("A")); GenericRecord record2 = new GenericData.Record(schema); record2.put("id", "2"); @@ -76,11 +79,38 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { record3.put("ts", 1L); record3.put("_hoodie_is_deleted", false); record3.put("city", "NY0"); - record3.put("child", Arrays.asList("A")); - + record3.put("child", Collections.singletonList("A")); + + // same content with record1 plus metadata fields + GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1"); + record4.put("id", "1"); + record4.put("partition", "partition1"); + record4.put("ts", 0L); + record4.put("_hoodie_is_deleted", false); + record4.put("city", "NY0"); + record4.put("child", Collections.singletonList("A")); + + // same content with record2 plus metadata fields + GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", ""); + record5.put("id", "2"); + record5.put("partition", ""); + record5.put("ts", 1L); + record5.put("_hoodie_is_deleted", false); + record5.put("city", "NY"); + record5.put("child", Collections.emptyList()); + + // same content with record3 plus metadata fields + GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", ""); + record6.put("id", "2"); + record6.put("partition", "partition1"); + record6.put("ts", 1L); + record6.put("_hoodie_is_deleted", false); + record6.put("city", "NY0"); + record6.put("child", Collections.singletonList("A")); OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2); + OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -94,6 +124,19 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get(); assertEquals(combinedVal2, record3); assertNotSame(combinedVal2, record3); + + // the real case in production is: the current record to be combined includes the metadata fields, + // the payload record could include the metadata fields (for compaction) or not (for normal writer path). + + // case1: validate normal writer path + IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get(); + assertEquals(combinedVal3, record3); + assertNotSame(combinedVal3, record3); + + // case2: validate compaction path + IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get(); + assertEquals(combinedVal4, record6); + assertNotSame(combinedVal4, record6); } @Test @@ -164,4 +207,14 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1); assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3); } + + private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) { + GenericRecord record = new GenericData.Record(schema); + record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001"); + record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123"); + record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey); + record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath); + record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1"); + return record; + } }