This is an automated email from the ASF dual-hosted git repository. uditme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2fee087 [HUDI-1181] Fix decimal type display issue for record key field (#1953) 2fee087 is described below commit 2fee087f0f167370570a88b6d9d28ddbe8402a3e Author: wenningd <wenningdin...@gmail.com> AuthorDate: Tue Sep 8 17:50:54 2020 -0700 [HUDI-1181] Fix decimal type display issue for record key field (#1953) * [HUDI-1181] Fix decimal type display issue for record key field * Remove getNestedFieldVal method from DataSourceUtils * resolve comments Co-authored-by: Wenning Ding <wenni...@amazon.com> --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 54 +++++++++++----- .../main/java/org/apache/hudi/DataSourceUtils.java | 74 ---------------------- .../java/org/apache/hudi/TestDataSourceUtils.java | 26 +++++++- .../hudi/utilities/deltastreamer/DeltaSync.java | 3 +- 4 files changed, 64 insertions(+), 93 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 57dde9f..422d75c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,10 +18,6 @@ package org.apache.hudi.avro; -import org.apache.avro.JsonProperties; -import java.time.LocalDate; -import org.apache.avro.LogicalTypes; -import org.apache.avro.generic.GenericData.Record; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -29,11 +25,17 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.SchemaCompatabilityException; +import org.apache.avro.Conversions.DecimalConversion; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalTypes; +import org.apache.avro.LogicalTypes.Decimal; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.BinaryDecoder; @@ -50,7 +52,9 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -433,9 +437,6 @@ public class HoodieAvroUtils { /** * This method converts values for fields with certain Avro/Parquet data types that require special handling. * - * Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is - * represented/stored in parquet. - * * @param fieldSchema avro field schema * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. @@ -445,23 +446,44 @@ public class HoodieAvroUtils { return fieldValue; } - if (isLogicalTypeDate(fieldSchema)) { - return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); + if (fieldSchema.getType() == Schema.Type.UNION) { + for (Schema schema : fieldSchema.getTypes()) { + if (schema.getType() != Schema.Type.NULL) { + return convertValueForAvroLogicalTypes(schema, fieldValue); + } + } } - return fieldValue; + return convertValueForAvroLogicalTypes(fieldSchema, fieldValue); } /** - * Given an Avro field schema checks whether the field is of Logical Date Type or not. + * This method converts values for fields with certain Avro Logical data types that require special handling. + * + * Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is + * represented/stored in parquet. + * + * Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is + * represented/stored in parquet. * * @param fieldSchema avro field schema - * @return boolean indicating whether fieldSchema is of Avro's Date Logical Type + * @param fieldValue avro field value + * @return field value either converted (for certain data types) or as it is. */ - private static boolean isLogicalTypeDate(Schema fieldSchema) { - if (fieldSchema.getType() == Schema.Type.UNION) { - return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date()); + private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) { + if (fieldSchema.getLogicalType() == LogicalTypes.date()) { + return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); + } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + Decimal dc = (Decimal) fieldSchema.getLogicalType(); + DecimalConversion decimalConversion = new DecimalConversion(); + if (fieldSchema.getType() == Schema.Type.FIXED) { + return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema, + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + } else if (fieldSchema.getType() == Schema.Type.BYTES) { + return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema, + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + } } - return fieldSchema.getLogicalType() == LogicalTypes.date(); + return fieldValue; } public static Schema getNullSchema() { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 0115f22..1890450 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -43,9 +43,6 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.table.BulkInsertPartitioner; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,12 +52,10 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Utilities used throughout the data source. @@ -69,42 +64,6 @@ public class DataSourceUtils { private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class); - /** - * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c - */ - public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) { - String[] parts = fieldName.split("\\."); - GenericRecord valueNode = record; - int i = 0; - for (; i < parts.length; i++) { - String part = parts[i]; - Object val = valueNode.get(part); - if (val == null) { - break; - } - - // return, if last part of name - if (i == parts.length - 1) { - Schema fieldSchema = valueNode.getSchema().getField(part).schema(); - return convertValueForSpecificDataTypes(fieldSchema, val); - } else { - // VC: Need a test here - if (!(val instanceof GenericRecord)) { - throw new HoodieException("Cannot find a record at part value :" + part); - } - valueNode = (GenericRecord) val; - } - } - - if (returnNullIfNotFound) { - return null; - } else { - throw new HoodieException( - fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :" - + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); - } - } - public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException { LOG.info("Getting table path.."); for (Path path : userProvidedPaths) { @@ -122,39 +81,6 @@ public class DataSourceUtils { } /** - * This method converts values for fields with certain Avro/Parquet data types that require special handling. - * - * Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is represented/stored in parquet. - * - * @param fieldSchema avro field schema - * @param fieldValue avro field value - * @return field value either converted (for certain data types) or as it is. - */ - private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) { - if (fieldSchema == null) { - return fieldValue; - } - - if (isLogicalTypeDate(fieldSchema)) { - return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); - } - return fieldValue; - } - - /** - * Given an Avro field schema checks whether the field is of Logical Date Type or not. - * - * @param fieldSchema avro field schema - * @return boolean indicating whether fieldSchema is of Avro's Date Logical Type - */ - private static boolean isLogicalTypeDate(Schema fieldSchema) { - if (fieldSchema.getType() == Schema.Type.UNION) { - return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date()); - } - return fieldSchema.getLogicalType() == LogicalTypes.date(); - } - - /** * Create a key generator class via reflection, passing in any configs needed. * <p> * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 32071d6..9ff114e 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -28,8 +28,11 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +43,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.math.BigDecimal; import java.time.LocalDate; import static org.hamcrest.CoreMatchers.containsString; @@ -77,13 +81,20 @@ public class TestDataSourceUtils { public void testAvroRecordsFieldConversion() { // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are // of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union - // date type + // date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to + // the event_date. String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ " + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]}," + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}}," + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]}," + "{\"name\": \"event_name\", \"type\": \"string\"}," - + "{\"name\": \"event_organizer\", \"type\": \"string\"}" + + "{\"name\": \"event_organizer\", \"type\": \"string\"}," + + "{\"name\": \"event_cost1\", \"type\": " + + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]}," + + "{\"name\": \"event_cost2\", \"type\": " + + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}}," + + "{\"name\": \"event_cost3\", \"type\": " + + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}" + "]}"; Schema avroSchema = new Schema.Parser().parse(avroSchemaString); @@ -94,6 +105,14 @@ public class TestDataSourceUtils { record.put("event_name", "Hudi Meetup"); record.put("event_organizer", "Hudi PMC"); + BigDecimal bigDecimal = new BigDecimal("123.184331"); + Schema decimalSchema = avroSchema.getField("event_cost1").schema().getTypes().get(0); + Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); + GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6)); + record.put("event_cost1", genericFixed); + record.put("event_cost2", genericFixed); + record.put("event_cost3", genericFixed); + assertEquals(LocalDate.ofEpochDay(18000).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date1", true)); assertEquals(LocalDate.ofEpochDay(18001).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date2", @@ -102,6 +121,9 @@ public class TestDataSourceUtils { true)); assertEquals("Hudi Meetup", HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true)); assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, "event_organizer", true)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost1", true)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost2", true)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost3", true)); } @Test diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 9fc24b4..be98a62 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; @@ -351,7 +352,7 @@ public class DeltaSync implements Serializable { JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); });