This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fee087  [HUDI-1181] Fix decimal type display issue for record key 
field (#1953)
2fee087 is described below

commit 2fee087f0f167370570a88b6d9d28ddbe8402a3e
Author: wenningd <wenningdin...@gmail.com>
AuthorDate: Tue Sep 8 17:50:54 2020 -0700

    [HUDI-1181] Fix decimal type display issue for record key field (#1953)
    
    * [HUDI-1181] Fix decimal type display issue for record key field
    
    * Remove getNestedFieldVal method from DataSourceUtils
    
    * resolve comments
    
    Co-authored-by: Wenning Ding <wenni...@amazon.com>
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 54 +++++++++++-----
 .../main/java/org/apache/hudi/DataSourceUtils.java | 74 ----------------------
 .../java/org/apache/hudi/TestDataSourceUtils.java  | 26 +++++++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  3 +-
 4 files changed, 64 insertions(+), 93 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 57dde9f..422d75c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.avro;
 
-import org.apache.avro.JsonProperties;
-import java.time.LocalDate;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.generic.GenericData.Record;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -29,11 +25,17 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.SchemaCompatabilityException;
 
+import org.apache.avro.Conversions.DecimalConversion;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.LogicalTypes.Decimal;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.BinaryDecoder;
@@ -50,7 +52,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -433,9 +437,6 @@ public class HoodieAvroUtils {
   /**
    * This method converts values for fields with certain Avro/Parquet data 
types that require special handling.
    *
-   * Logical Date Type is converted to actual Date value instead of Epoch 
Integer which is how it is
-   * represented/stored in parquet.
-   *
    * @param fieldSchema avro field schema
    * @param fieldValue avro field value
    * @return field value either converted (for certain data types) or as it is.
@@ -445,23 +446,44 @@ public class HoodieAvroUtils {
       return fieldValue;
     }
 
-    if (isLogicalTypeDate(fieldSchema)) {
-      return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
+    if (fieldSchema.getType() == Schema.Type.UNION) {
+      for (Schema schema : fieldSchema.getTypes()) {
+        if (schema.getType() != Schema.Type.NULL) {
+          return convertValueForAvroLogicalTypes(schema, fieldValue);
+        }
+      }
     }
-    return fieldValue;
+    return convertValueForAvroLogicalTypes(fieldSchema, fieldValue);
   }
 
   /**
-   * Given an Avro field schema checks whether the field is of Logical Date 
Type or not.
+   * This method converts values for fields with certain Avro Logical data 
types that require special handling.
+   *
+   * Logical Date Type is converted to actual Date value instead of Epoch 
Integer which is how it is
+   * represented/stored in parquet.
+   *
+   * Decimal Data Type is converted to actual decimal value instead of 
bytes/fixed which is how it is
+   * represented/stored in parquet.
    *
    * @param fieldSchema avro field schema
-   * @return boolean indicating whether fieldSchema is of Avro's Date Logical 
Type
+   * @param fieldValue avro field value
+   * @return field value either converted (for certain data types) or as it is.
    */
-  private static boolean isLogicalTypeDate(Schema fieldSchema) {
-    if (fieldSchema.getType() == Schema.Type.UNION) {
-      return fieldSchema.getTypes().stream().anyMatch(schema -> 
schema.getLogicalType() == LogicalTypes.date());
+  private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, 
Object fieldValue) {
+    if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
+      return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
+    } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+      Decimal dc = (Decimal) fieldSchema.getLogicalType();
+      DecimalConversion decimalConversion = new DecimalConversion();
+      if (fieldSchema.getType() == Schema.Type.FIXED) {
+        return decimalConversion.fromFixed((GenericFixed) fieldValue, 
fieldSchema,
+            LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
+      } else if (fieldSchema.getType() == Schema.Type.BYTES) {
+        return decimalConversion.fromBytes((ByteBuffer) fieldValue, 
fieldSchema,
+            LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
+      }
     }
-    return fieldSchema.getLogicalType() == LogicalTypes.date();
+    return fieldValue;
   }
 
   public static Schema getNullSchema() {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java 
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 0115f22..1890450 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -43,9 +43,6 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,12 +52,10 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
-import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * Utilities used throughout the data source.
@@ -69,42 +64,6 @@ public class DataSourceUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(DataSourceUtils.class);
 
-  /**
-   * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
-   */
-  public static Object getNestedFieldVal(GenericRecord record, String 
fieldName, boolean returnNullIfNotFound) {
-    String[] parts = fieldName.split("\\.");
-    GenericRecord valueNode = record;
-    int i = 0;
-    for (; i < parts.length; i++) {
-      String part = parts[i];
-      Object val = valueNode.get(part);
-      if (val == null) {
-        break;
-      }
-
-      // return, if last part of name
-      if (i == parts.length - 1) {
-        Schema fieldSchema = valueNode.getSchema().getField(part).schema();
-        return convertValueForSpecificDataTypes(fieldSchema, val);
-      } else {
-        // VC: Need a test here
-        if (!(val instanceof GenericRecord)) {
-          throw new HoodieException("Cannot find a record at part value :" + 
part);
-        }
-        valueNode = (GenericRecord) val;
-      }
-    }
-
-    if (returnNullIfNotFound) {
-      return null;
-    } else {
-      throw new HoodieException(
-          fieldName + "(Part -" + parts[i] + ") field not found in record. 
Acceptable fields were :"
-              + 
valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
-    }
-  }
-
   public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) 
throws IOException {
     LOG.info("Getting table path..");
     for (Path path : userProvidedPaths) {
@@ -122,39 +81,6 @@ public class DataSourceUtils {
   }
 
   /**
-   * This method converts values for fields with certain Avro/Parquet data 
types that require special handling.
-   *
-   * Logical Date Type is converted to actual Date value instead of Epoch 
Integer which is how it is represented/stored in parquet.
-   *
-   * @param fieldSchema avro field schema
-   * @param fieldValue avro field value
-   * @return field value either converted (for certain data types) or as it is.
-   */
-  private static Object convertValueForSpecificDataTypes(Schema fieldSchema, 
Object fieldValue) {
-    if (fieldSchema == null) {
-      return fieldValue;
-    }
-
-    if (isLogicalTypeDate(fieldSchema)) {
-      return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
-    }
-    return fieldValue;
-  }
-
-  /**
-   * Given an Avro field schema checks whether the field is of Logical Date 
Type or not.
-   *
-   * @param fieldSchema avro field schema
-   * @return boolean indicating whether fieldSchema is of Avro's Date Logical 
Type
-   */
-  private static boolean isLogicalTypeDate(Schema fieldSchema) {
-    if (fieldSchema.getType() == Schema.Type.UNION) {
-      return fieldSchema.getTypes().stream().anyMatch(schema -> 
schema.getLogicalType() == LogicalTypes.date());
-    }
-    return fieldSchema.getLogicalType() == LogicalTypes.date();
-  }
-
-  /**
    * Create a key generator class via reflection, passing in any configs 
needed.
    * <p>
    * If the class name of key generator is configured through the properties 
file, i.e., {@code props}, use the corresponding key generator class; 
otherwise, use the default key generator class
diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java 
b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 32071d6..9ff114e 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -28,8 +28,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +43,7 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.math.BigDecimal;
 import java.time.LocalDate;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -77,13 +81,20 @@ public class TestDataSourceUtils {
   public void testAvroRecordsFieldConversion() {
     // There are fields event_date1, event_date2, event_date3 with logical 
type as Date. event_date1 & event_date3 are
     // of UNION schema type, which is a union of null and date type in 
different orders. event_date2 is non-union
-    // date type
+    // date type. event_cost1, event_cost2, event3 are decimal logical types 
with UNION schema, which is similar to
+    // the event_date.
     String avroSchemaString = "{\"type\": \"record\"," + "\"name\": 
\"events\"," + "\"fields\": [ "
         + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", 
\"logicalType\" : \"date\"}, \"null\"]},"
         + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", 
\"logicalType\" : \"date\"}},"
         + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : 
\"int\", \"logicalType\" : \"date\"}]},"
         + "{\"name\": \"event_name\", \"type\": \"string\"},"
-        + "{\"name\": \"event_organizer\", \"type\": \"string\"}"
+        + "{\"name\": \"event_organizer\", \"type\": \"string\"},"
+        + "{\"name\": \"event_cost1\", \"type\": "
+        + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, 
\"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]},"
+        + "{\"name\": \"event_cost2\", \"type\": "
+        + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, 
\"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}},"
+        + "{\"name\": \"event_cost3\", \"type\": "
+        + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, 
\"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
         + "]}";
 
     Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
@@ -94,6 +105,14 @@ public class TestDataSourceUtils {
     record.put("event_name", "Hudi Meetup");
     record.put("event_organizer", "Hudi PMC");
 
+    BigDecimal bigDecimal = new BigDecimal("123.184331");
+    Schema decimalSchema = 
avroSchema.getField("event_cost1").schema().getTypes().get(0);
+    Conversions.DecimalConversion decimalConversions = new 
Conversions.DecimalConversion();
+    GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, 
decimalSchema, LogicalTypes.decimal(10, 6));
+    record.put("event_cost1", genericFixed);
+    record.put("event_cost2", genericFixed);
+    record.put("event_cost3", genericFixed);
+
     assertEquals(LocalDate.ofEpochDay(18000).toString(), 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_date1",
         true));
     assertEquals(LocalDate.ofEpochDay(18001).toString(), 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_date2",
@@ -102,6 +121,9 @@ public class TestDataSourceUtils {
         true));
     assertEquals("Hudi Meetup", 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true));
     assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, 
"event_organizer", true));
+    assertEquals(bigDecimal.toString(), 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost1", true));
+    assertEquals(bigDecimal.toString(), 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost2", true));
+    assertEquals(bigDecimal.toString(), 
HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost3", true));
   }
 
   @Test
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 9fc24b4..be98a62 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
@@ -351,7 +352,7 @@ public class DeltaSync implements Serializable {
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
     JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
       HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false));
+          (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false));
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });
 

Reply via email to