This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4192cc65d02640f53db78922a1783103e7a46cfb Author: rolandjohann <rmmjoh...@gmail.com> AuthorDate: Mon May 18 13:28:27 2020 +0200 [HUDI-863] get decimal properties from derived spark DataType (#1596) --- .../org/apache/hudi/AvroConversionHelper.scala | 22 ++++++++++------------ .../org/apache/hudi/AvroConversionUtils.scala | 4 +--- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 43225bc..69e6376 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -268,8 +268,7 @@ object AvroConversionHelper { createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) } - def createConverterToAvro(avroSchema: Schema, - dataType: DataType, + def createConverterToAvro(dataType: DataType, structName: String, recordNamespace: String): Any => Any = { dataType match { @@ -284,13 +283,15 @@ object AvroConversionHelper { if (item == null) null else item.asInstanceOf[Byte].intValue case ShortType => (item: Any) => if (item == null) null else item.asInstanceOf[Short].intValue - case dec: DecimalType => (item: Any) => - Option(item).map { _ => - val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal] - val decimalConversions = new DecimalConversion() - decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0), - LogicalTypes.decimal(dec.precision, dec.scale)) - }.orNull + case dec: DecimalType => + val schema = SchemaConverters.toAvroType(dec, nullable = false, structName, recordNamespace) + (item: Any) => { + Option(item).map { _ => + val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal] + val decimalConversions = new DecimalConversion() + decimalConversions.toFixed(bigDecimalValue, schema, LogicalTypes.decimal(dec.precision, dec.scale)) + }.orNull + } case TimestampType => (item: Any) => // Convert time to microseconds since spark-avro by default converts TimestampType to // Avro Logical TimestampMicros @@ -299,7 +300,6 @@ object AvroConversionHelper { Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull case ArrayType(elementType, _) => val elementConverter = createConverterToAvro( - avroSchema, elementType, structName, recordNamespace) @@ -320,7 +320,6 @@ object AvroConversionHelper { } case MapType(StringType, valueType, _) => val valueConverter = createConverterToAvro( - avroSchema, valueType, structName, recordNamespace) @@ -340,7 +339,6 @@ object AvroConversionHelper { val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName val fieldConverters = structType.fields.map(field => createConverterToAvro( - avroSchema, field.dataType, field.name, childNameSpace)) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 04de1c7..bdb8955 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -38,14 +38,12 @@ object AvroConversionUtils { : RDD[GenericRecord] = { // Use the Avro schema to derive the StructType which has the correct nullability information val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val avroSchemaAsJsonString = avroSchema.toString val encoder = RowEncoder.apply(dataType).resolveAndBind() df.queryExecution.toRdd.map(encoder.fromRow) .mapPartitions { records => if (records.isEmpty) Iterator.empty else { - val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) - val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace) + val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) records.map { x => convertor(x).asInstanceOf[GenericRecord] } } }