hudi-bot opened a new issue, #16388:
URL: https://github.com/apache/hudi/issues/16388

   h2. Problem
   
   When attempting to ingest record with an Avro target schema which includes a 
field that uses the 
[decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in 
Hudi 0.14.1, an exception is thrown:
   {code:java}
   24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
   java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
support rewrite value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
           at 
org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
           at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
           at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
           at org.apache.spark.scheduler.Task.run(Task.scala:139)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite 
value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
           ... 21 more {code}
   h2. Reproduction Steps
   
   1. Setup clean spark install
   {code:java}
   mkdir /tmp/hudi-decimal-repro
   cd /tmp/hudi-decimal-repro
   tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
   2. Create a minimal schema file based on [the demo 
schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
  The only change is the {{type}} of the field named {{{}low{}}}.
   {code:java}
   echo '{
     "type":"record",
     "name":"stock_ticks",
     "fields":[{
        "name": "volume",
        "type": "long"
     }, {
        "name": "ts",
        "type": "string"
     }, {
        "name": "symbol",
        "type": "string"
     },{
        "name": "year",
        "type": "int"
     },{
        "name": "month",
        "type": "string"
     },{
        "name": "high",
        "type": "double"
     },{
        "name": "low",
        "type": {
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 10,
          "scale": 5
        }
     },{
        "name": "key",
        "type": "string"
     },{
        "name": "date",
        "type":"string"
     }, {
        "name": "close",
        "type": "double"
     }, {
        "name": "open",
        "type": "double"
     }, {
        "name": "day",
        "type":"string"
     }
   ]}' > schema.avsc{code}
   3. Create a minimal properties file
   {code:java}
   echo "hoodie.datasource.write.recordkey.field=key
   hoodie.datasource.write.partitionpath.field=date
   hoodie.table.recordkey.fields=key
   hoodie.table.partition.fields=date
   
hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
   
hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
   hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
decimal-repro.properties{code}
   4. Copy data file from the docker demo
   {code:java}
   mkdir data
   cd data
   wget 
https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
 
   cd ..{code}
   5. Run HoodieStreamer
   {code:java}
   spark-3.4.2-bin-hadoop3/bin/spark-submit \
      --packages 
org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
 \
      --conf spark.kryoserializer.buffer.max=200m \
      --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
      --class org.apache.hudi.utilities.streamer.HoodieStreamer \
      spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
      --table-type COPY_ON_WRITE \
      --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
      --target-base-path /tmp/hudi-decimal-repro/table \
      --target-table table \
      --props /tmp/hudi-decimal-repro/decimal-repro.properties \
      --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
   h3. Expected Results
   
   Command runs successfully, data is ingested successfully into 
{{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
{{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
   h3. Actual Results
   
   Command fails with exception, no data is ingsted into the table.  Table left 
with a hanging commit at the requested state.
   
   Logs of the attempted run are attached as spark.log
   h2. Additional Information
   
   This issue does not appear to exist in versions 0.12.2 through 0.14.0 based 
on my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, 
1.0.0-beta2, and 1.0.0 releases.
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-7388
   - Type: Bug
   - Affects version(s):
     - 0.14.1
     - 0.15.0
     - 1.0.0
     - 1.0.0-beta1
     - 1.0.0-beta2
   - Fix version(s):
     - 0.16.0
   - Attachment(s):
     - 07/Feb/24 
03:04;brandon.dahler.amazon;decimal-repro.properties;https://issues.apache.org/jira/secure/attachment/13066533/decimal-repro.properties
     - 27/Jan/25 
18:03;brandon.dahler.amazon;schema.avsc;https://issues.apache.org/jira/secure/attachment/13074293/schema.avsc
     - 07/Feb/24 
03:04;brandon.dahler.amazon;spark.log;https://issues.apache.org/jira/secure/attachment/13066535/spark.log
   
   
   ---
   
   
   ## Comments
   
   27/Jan/25 17:15;brandon.dahler.amazon;Was able to re-test against 1.0.0 and 
noticed that there's a slightly different exception thrown now:
   {code:java}
   25/01/24 10:26:17 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 7)
   java.lang.RuntimeException: 
org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert JSON 
string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts": "2018-08-31 
09:30:00", "month": "08", "high": 111.74, "low":
   111.55, "key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", 
"close": 111.72, "open": 111.55, "day": "31"}; schema: 
{"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"stri
   
ng"},{"name":"symbol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key"
   
,"type":"string"},{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]}
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:123)
           at 
org.apache.hudi.common.util.collection.ClosableIterator$1.next(ClosableIterator.java:41)
           at 
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
           at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
           at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
           at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
           at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
           at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
           at java.base/java.lang.Thread.run(Thread.java:1583)
   Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to 
convert JSON string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts": 
"2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "M
   SFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 111.72, 
"open": 111.55, "day": "31"}; schema: 
{"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"string"},{"name":"sym
   
bol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key","type":"string"}
   
,{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]}
           at 
org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:122)
           at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
           at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:33)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
           ... 21 more
   Caused by: org.apache.hudi.exception.HoodieJsonToAvroConversionException: 
failed to convert json to avro
           at 
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:119)
           at 
org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:118)
           ... 27 more
   Caused by: org.apache.hudi.exception.HoodieJsonConversionException: failed 
to convert json to avro
           at 
org.apache.hudi.avro.processors.JsonFieldProcessor.convertField(JsonFieldProcessor.java:33)
           at 
org.apache.hudi.avro.MercifulJsonConverter.convertField(MercifulJsonConverter.java:198)
           at 
org.apache.hudi.avro.MercifulJsonConverter.convertJsonField(MercifulJsonConverter.java:193)
           at 
org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvro(MercifulJsonConverter.java:136)
           at 
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:117)
           ... 28 more {code}
   Edit - This appears to be caused by the fact that the precision config in 
the test schema doesn't match the precision of the actual data.  Once that has 
been fixed the original exception is showing again:
   
    
   {code:java}
   25/01/27 12:30:08 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
   java.lang.RuntimeException: 
org.apache.hudi.exception.HoodieAvroSchemaException: cannot support rewrite 
value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":5,"logicalType":"decimal","precision":10,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":10,"scale":2}
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:123)
           at 
org.apache.hudi.common.util.collection.ClosableIterator$1.next(ClosableIterator.java:41)
           at 
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
           at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
           at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
           at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
           at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
           at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
           at java.base/java.lang.Thread.run(Thread.java:1583)
   Caused by: org.apache.hudi.exception.HoodieAvroSchemaException: cannot 
support rewrite value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":5,"logicalType":"decimal","precision":10,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":10,"scale":2}
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1145)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1063)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:998)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:913)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:938)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:913)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:879)
           at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1383)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
           at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
           at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
           ... 21 more {code};;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to