[ 
https://issues.apache.org/jira/browse/HUDI-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-6033:
-----------------------
    Description: 
This issue only exists in MOR tables.

 

When performing a DECIMAL/FLOAT to DECIMAL(p, s) casting and when the a row's 
data has a floating point/decimal placing that is larger than the provided 
scale (s), the error below is thrown. i.e. this error will be thrown if there 
is a lost in scale when casting from the source to destination type. 

 

For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to 
DECIMAL(3, 2) will throw the error below when the row/column is required to be 
read out.

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
log file 
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
    at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
    at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
    at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
    at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
    at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
    at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
    at java.math.BigDecimal.setScale(BigDecimal.java:2455)
    at java.math.BigDecimal.setScale(BigDecimal.java:2515)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
    at 
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
    at 
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
    ... 27 more {code}
This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*}, 
which is what we use internally when performing an unsafe projection.

 

{*}Reference{*}:

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]

 

*NOTE 1:*

If the results of casting a FLOAT to DECIMAL type differs depending on the 
table type used when reading on SPARK

 

COW tables will rely on COW's unsafe projection, and hence, Spark's casting. 

MOR tables will rely on MOR's HoodieAvroUtils to perform the 
{*}rewriteWithNewSchema{*}.

 

Floating point errors are hard to control given that different execution code 
paths are used between COW and MOR, causing a discrepancy in the results.

 

Hence, for the test to verify this fix, no verification on the correctness of 
results will be performed. As long as the table can be read without issue 
(after performing schema evolution), the fix is deemed to be valid.

 

*NOTE 2:*

When performing a DOUBLE to DECIMAL casting, the result for COW and MOR tables 
should be consistent given that fixed scale types are not susceptible to the 
floating point rounding errors as described in {*}NOTE 1{*}. 

 

Since SPARK uses *HALF_UP* rounding when performing a FIX_SCALE_TYPE to DECIMAL 
casting when there is a lost in scale, hence, MOR's HoodieAvroUtils should also 
follow the same heuristics.

 
{code:java}
-- test  HALF_UP rounding (verify that it does not use HALF_EVEN)
> SELECT CAST(CAST("10.024" AS DOUBLE) AS DECIMAL(4, 2));
10.02

> SELECT CAST(CAST("10.025" AS DOUBLE) AS DECIMAL(4, 2));
10.03

> SELECT CAST(CAST("10.026" AS DOUBLE) AS DECIMAL(4, 2));
10.03


-- test negative HALF_UP rounding (verify that it does not use HALF_EVEN)
> SELECT CAST(CAST("-10.024" AS DOUBLE) AS DECIMAL(4, 2));
-10.02

> SELECT CAST(CAST("-10.025" AS DOUBLE) AS DECIMAL(4, 2));
-10.03

> SELECT CAST(CAST("-10.026" AS DOUBLE) AS DECIMAL(4, 2));
-10.03


-- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> SELECT CAST(CAST("10.034" AS DOUBLE) AS DECIMAL(4, 2));
10.03

> SELECT CAST(CAST("10.035" AS DOUBLE) AS DECIMAL(4, 2));
10.04

> SELECT CAST(CAST("10.036" AS DOUBLE) AS DECIMAL(4, 2));
10.04


-- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> SELECT CAST(CAST("-10.034" AS DOUBLE) AS DECIMAL(4, 2));
-10.03

> SELECT CAST(CAST("-10.035" AS DOUBLE) AS DECIMAL(4, 2));
-10.04

> SELECT CAST(CAST("-10.036" AS DOUBLE) AS DECIMAL(4, 2));
-10.04{code}

  was:
This issue only exists in MOR tables.

 

When performing a DECIMAL/FLOAT to DECIMAL(p, s) casting and when the a row's 
data has a floating point/decimal placing that is larger than the provided 
scale (s), the error below is thrown. i.e. this error will be thrown if there 
is a lost in scale when casting from the source to destination type. 

 

For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to 
DECIMAL(3, 2) will throw the error below when the row/column is required to be 
read out.

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
log file 
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
    at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
    at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
    at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
    at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
    at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
    at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
    at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
    at java.math.BigDecimal.setScale(BigDecimal.java:2455)
    at java.math.BigDecimal.setScale(BigDecimal.java:2515)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
    at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
    at 
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
    at 
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
    at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
    ... 27 more {code}
This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*}, 
which is what we use internally when performing an unsafe projection.

 

{*}Reference{*}:

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]

 

*NOTE 1:*

If the results of casting a FLOAT to DECIMAL type differs depending on the 
table type used when reading on SPARK

 

COW tables will rely on COW's unsafe projection, and hence, Spark's casting. 

MOR tables will rely on MOR's HoodieAvroUtils to perform the 
{*}rewriteWithNewSchema{*}.

 

Floating point errors are hard to control given that different execution code 
paths are used between COW and MOR, causing a discrepancy in the results.

 

Hence, for the test to verify this fix, no verification on the correctness of 
results will be performed. As long as the table can be read without issue 
(after performing schema evolution), the fix is deemed to be valid.

 

*NOTE 2:*

When performing a DOUBLE to DECIMAL casting, the result for COW and MOR tables 
should be consistent given that fixed scale types are not susceptible to the 
floating point rounding errors as described in {*}NOTE 1{*}. 

 

Since SPARK uses *HALF_UP* rounding when performing a DOUBLE to DECIMAL casting 
when there is a lost in scale, hence, MOR's HoodieAvroUtils should also follow 
the same heuristics.

 
{code:java}
-- test  HALF_UP rounding (verify that it does not use HALF_EVEN)
> SELECT CAST(CAST("10.024" AS DOUBLE) AS DECIMAL(4, 2));
10.02

> SELECT CAST(CAST("10.025" AS DOUBLE) AS DECIMAL(4, 2));
10.03

> SELECT CAST(CAST("10.026" AS DOUBLE) AS DECIMAL(4, 2));
10.03


-- test negative HALF_UP rounding (verify that it does not use HALF_EVEN)
> SELECT CAST(CAST("-10.024" AS DOUBLE) AS DECIMAL(4, 2));
-10.02

> SELECT CAST(CAST("-10.025" AS DOUBLE) AS DECIMAL(4, 2));
-10.03

> SELECT CAST(CAST("-10.026" AS DOUBLE) AS DECIMAL(4, 2));
-10.03


-- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> SELECT CAST(CAST("10.034" AS DOUBLE) AS DECIMAL(4, 2));
10.03

> SELECT CAST(CAST("10.035" AS DOUBLE) AS DECIMAL(4, 2));
10.04

> SELECT CAST(CAST("10.036" AS DOUBLE) AS DECIMAL(4, 2));
10.04


-- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> SELECT CAST(CAST("-10.034" AS DOUBLE) AS DECIMAL(4, 2));
-10.03

> SELECT CAST(CAST("-10.035" AS DOUBLE) AS DECIMAL(4, 2));
-10.04

> SELECT CAST(CAST("-10.036" AS DOUBLE) AS DECIMAL(4, 2));
-10.04{code}


> Fix to DECIMAL(p, s) schema evolution when reading avro log files when scale 
> is lost
> ------------------------------------------------------------------------------------
>
>                 Key: HUDI-6033
>                 URL: https://issues.apache.org/jira/browse/HUDI-6033
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>
> This issue only exists in MOR tables.
>  
> When performing a DECIMAL/FLOAT to DECIMAL(p, s) casting and when the a row's 
> data has a floating point/decimal placing that is larger than the provided 
> scale (s), the error below is thrown. i.e. this error will be thrown if there 
> is a lost in scale when casting from the source to destination type. 
>  
> For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to 
> DECIMAL(3, 2) will throw the error below when the row/column is required to 
> be read out.
>  
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
>     at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
>     at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
>     at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
>     at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
>     at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
>     at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
>     at 
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:131)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.ArithmeticException: Rounding necessary
>     at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
>     at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
>     at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
>     at java.math.BigDecimal.setScale(BigDecimal.java:2455)
>     at java.math.BigDecimal.setScale(BigDecimal.java:2515)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
>     at 
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
>     at 
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
>     at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
>     ... 27 more {code}
> This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*}, 
> which is what we use internally when performing an unsafe projection.
>  
> {*}Reference{*}:
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]
>  
> *NOTE 1:*
> If the results of casting a FLOAT to DECIMAL type differs depending on the 
> table type used when reading on SPARK
>  
> COW tables will rely on COW's unsafe projection, and hence, Spark's casting. 
> MOR tables will rely on MOR's HoodieAvroUtils to perform the 
> {*}rewriteWithNewSchema{*}.
>  
> Floating point errors are hard to control given that different execution code 
> paths are used between COW and MOR, causing a discrepancy in the results.
>  
> Hence, for the test to verify this fix, no verification on the correctness of 
> results will be performed. As long as the table can be read without issue 
> (after performing schema evolution), the fix is deemed to be valid.
>  
> *NOTE 2:*
> When performing a DOUBLE to DECIMAL casting, the result for COW and MOR 
> tables should be consistent given that fixed scale types are not susceptible 
> to the floating point rounding errors as described in {*}NOTE 1{*}. 
>  
> Since SPARK uses *HALF_UP* rounding when performing a FIX_SCALE_TYPE to 
> DECIMAL casting when there is a lost in scale, hence, MOR's HoodieAvroUtils 
> should also follow the same heuristics.
>  
> {code:java}
> -- test  HALF_UP rounding (verify that it does not use HALF_EVEN)
> > SELECT CAST(CAST("10.024" AS DOUBLE) AS DECIMAL(4, 2));
> 10.02
> > SELECT CAST(CAST("10.025" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> > SELECT CAST(CAST("10.026" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> -- test negative HALF_UP rounding (verify that it does not use HALF_EVEN)
> > SELECT CAST(CAST("-10.024" AS DOUBLE) AS DECIMAL(4, 2));
> -10.02
> > SELECT CAST(CAST("-10.025" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> > SELECT CAST(CAST("-10.026" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> -- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> > SELECT CAST(CAST("10.034" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> > SELECT CAST(CAST("10.035" AS DOUBLE) AS DECIMAL(4, 2));
> 10.04
> > SELECT CAST(CAST("10.036" AS DOUBLE) AS DECIMAL(4, 2));
> 10.04
> -- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> > SELECT CAST(CAST("-10.034" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> > SELECT CAST(CAST("-10.035" AS DOUBLE) AS DECIMAL(4, 2));
> -10.04
> > SELECT CAST(CAST("-10.036" AS DOUBLE) AS DECIMAL(4, 2));
> -10.04{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to