This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6d7ca2c [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema (#1427) 6d7ca2c is described below commit 6d7ca2cf7e441ad19d32d7a25739e454f39ed253 Author: Pratyaksh Sharma <pratyaks...@gmail.com> AuthorDate: Mon Apr 13 06:25:26 2020 +0530 [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema (#1427) --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 43 +++++++++++------ .../org/apache/hudi/avro/TestHoodieAvroUtils.java | 54 +++++++++++++++++++++- .../realtime/AbstractRealtimeRecordReader.java | 4 +- 3 files changed, 84 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 2eec748..af44a31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -32,7 +32,6 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.node.NullNode; import java.io.ByteArrayInputStream; @@ -44,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -104,15 +104,15 @@ public class HoodieAvroUtils { List<Schema.Field> parentFields = new ArrayList<>(); Schema.Field commitTimeField = - new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null); Schema.Field commitSeqnoField = - new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null); Schema.Field recordKeyField = - new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null); Schema.Field partitionPathField = - new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null); Schema.Field fileNameField = - new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null); parentFields.add(commitTimeField); parentFields.add(commitSeqnoField); @@ -121,8 +121,8 @@ public class HoodieAvroUtils { parentFields.add(fileNameField); for (Schema.Field field : schema.getFields()) { if (!isMetadataField(field.name())) { - Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); - for (Map.Entry<String, JsonNode> prop : field.getJsonProps().entrySet()) { + Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) { newField.addProp(prop.getKey(), prop.getValue()); } parentFields.add(newField); @@ -191,7 +191,7 @@ public class HoodieAvroUtils { * schema. */ public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) { - return rewrite(record, record.getSchema(), newSchema); + return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema); } /** @@ -199,13 +199,17 @@ public class HoodieAvroUtils { * schema. */ public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) { - return rewrite(record, newSchema, newSchema); + return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema); } - private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFields, Schema newSchema) { + private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) { GenericRecord newRecord = new GenericData.Record(newSchema); - for (Schema.Field f : schemaWithFields.getFields()) { - newRecord.put(f.name(), record.get(f.name())); + for (Schema.Field f : fieldsToWrite) { + if (record.get(f.name()) == null) { + newRecord.put(f.name(), f.defaultVal()); + } else { + newRecord.put(f.name(), record.get(f.name())); + } } if (!GenericData.get().validate(newSchema, newRecord)) { throw new SchemaCompatabilityException( @@ -214,6 +218,19 @@ public class HoodieAvroUtils { return newRecord; } + /** + * Generates a super set of fields from both old and new schema. + */ + private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) { + LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields()); + for (Schema.Field f : newSchema.getFields()) { + if (!allFields.contains(f) && !isMetadataField(f.name())) { + allFields.add(f); + } + } + return allFields; + } + public static byte[] compress(String text) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index c0be005..edcb37d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -19,7 +19,8 @@ package org.apache.hudi.avro; import org.apache.avro.Schema; -import org.codehaus.jackson.JsonNode; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.junit.Assert; import org.junit.Test; @@ -30,11 +31,25 @@ import java.util.Map; */ public class TestHoodieAvroUtils { + private static String EVOLVED_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec1\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}," + + "{\"name\": \"new_col1\", \"type\": \"string\", \"default\": \"dummy_val\"}," + + "{\"name\": \"new_col2\", \"type\": [\"int\", \"null\"]}]}"; + private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}"; + + private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}," + + "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]}]}"; + @Test public void testPropsPresent() { Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA)); @@ -45,7 +60,7 @@ public class TestHoodieAvroUtils { } Assert.assertNotNull("field name is null", field.name()); - Map<String, JsonNode> props = field.getJsonProps(); + Map<String, Object> props = field.getObjectProps(); Assert.assertNotNull("The property is null", props); if (field.name().equals("pii_col")) { @@ -57,4 +72,39 @@ public class TestHoodieAvroUtils { } Assert.assertTrue("column pii_col doesn't show up", piiPresent); } + + @Test + public void testDefaultValue() { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EVOLVED_SCHEMA)); + rec.put("_row_key", "key1"); + rec.put("non_pii_col", "val1"); + rec.put("pii_col", "val2"); + rec.put("timestamp", 3.5); + GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA)); + Assert.assertEquals(rec1.get("new_col1"), "dummy_val"); + Assert.assertNull(rec1.get("new_col2")); + } + + @Test + public void testDefaultValueWithSchemaEvolution() { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); + rec.put("_row_key", "key1"); + rec.put("non_pii_col", "val1"); + rec.put("pii_col", "val2"); + rec.put("timestamp", 3.5); + GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA)); + Assert.assertEquals(rec1.get("new_col1"), "dummy_val"); + Assert.assertNull(rec1.get("new_col2")); + } + + @Test + public void testMetadataField() { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); + rec.put("_row_key", "key1"); + rec.put("non_pii_col", "val1"); + rec.put("pii_col", "val2"); + rec.put("timestamp", 3.5); + GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD)); + Assert.assertNull(rec1.get("_hoodie_commit_time")); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 1660a00..1c484e8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -212,7 +212,7 @@ public abstract class AbstractRealtimeRecordReader { throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); } else { - projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); + projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); } } @@ -367,7 +367,7 @@ public abstract class AbstractRealtimeRecordReader { Field field = schemaFieldsMap.get(columnName.toLowerCase()); if (field != null) { - hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); + hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); } else { // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema. // They will get skipped as they won't be found in the original schema.