hudi-bot opened a new issue, #16388: URL: https://github.com/apache/hudi/issues/16388
h2. Problem When attempting to ingest record with an Avro target schema which includes a field that uses the [decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in Hudi 0.14.1, an exception is thrown: {code:java} 24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2} at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) at org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2} at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088) at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ... 21 more {code} h2. Reproduction Steps 1. Setup clean spark install {code:java} mkdir /tmp/hudi-decimal-repro cd /tmp/hudi-decimal-repro tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code} 2. Create a minimal schema file based on [the demo schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc]. The only change is the {{type}} of the field named {{{}low{}}}. {code:java} echo '{ "type":"record", "name":"stock_ticks", "fields":[{ "name": "volume", "type": "long" }, { "name": "ts", "type": "string" }, { "name": "symbol", "type": "string" },{ "name": "year", "type": "int" },{ "name": "month", "type": "string" },{ "name": "high", "type": "double" },{ "name": "low", "type": { "type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 5 } },{ "name": "key", "type": "string" },{ "name": "date", "type":"string" }, { "name": "close", "type": "double" }, { "name": "open", "type": "double" }, { "name": "day", "type":"string" } ]}' > schema.avsc{code} 3. Create a minimal properties file {code:java} echo "hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=date hoodie.table.recordkey.fields=key hoodie.table.partition.fields=date hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > decimal-repro.properties{code} 4. Copy data file from the docker demo {code:java} mkdir data cd data wget https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json cd ..{code} 5. Run HoodieStreamer {code:java} spark-3.4.2-bin-hadoop3/bin/spark-submit \ --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0 \ --conf spark.kryoserializer.buffer.max=200m \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \ spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonDFSSource \ --target-base-path /tmp/hudi-decimal-repro/table \ --target-table table \ --props /tmp/hudi-decimal-repro/decimal-repro.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code} h3. Expected Results Command runs successfully, data is ingested successfully into {{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under {{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}. h3. Actual Results Command fails with exception, no data is ingsted into the table. Table left with a hanging commit at the requested state. Logs of the attempted run are attached as spark.log h2. Additional Information This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on my own testing. It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, 1.0.0-beta2, and 1.0.0 releases. ## JIRA info - Link: https://issues.apache.org/jira/browse/HUDI-7388 - Type: Bug - Affects version(s): - 0.14.1 - 0.15.0 - 1.0.0 - 1.0.0-beta1 - 1.0.0-beta2 - Fix version(s): - 0.16.0 - Attachment(s): - 07/Feb/24 03:04;brandon.dahler.amazon;decimal-repro.properties;https://issues.apache.org/jira/secure/attachment/13066533/decimal-repro.properties - 27/Jan/25 18:03;brandon.dahler.amazon;schema.avsc;https://issues.apache.org/jira/secure/attachment/13074293/schema.avsc - 07/Feb/24 03:04;brandon.dahler.amazon;spark.log;https://issues.apache.org/jira/secure/attachment/13066535/spark.log --- ## Comments 27/Jan/25 17:15;brandon.dahler.amazon;Was able to re-test against 1.0.0 and noticed that there's a slightly different exception thrown now: {code:java} 25/01/24 10:26:17 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 7) java.lang.RuntimeException: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert JSON string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts": "2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 111.72, "open": 111.55, "day": "31"}; schema: {"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"stri ng"},{"name":"symbol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key" ,"type":"string"},{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]} at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:123) at org.apache.hudi.common.util.collection.ClosableIterator$1.next(ClosableIterator.java:41) at org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert JSON string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts": "2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "M SFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 111.72, "open": 111.55, "day": "31"}; schema: {"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"string"},{"name":"sym bol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key","type":"string"} ,{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]} at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:122) at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:33) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) ... 21 more Caused by: org.apache.hudi.exception.HoodieJsonToAvroConversionException: failed to convert json to avro at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:119) at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:118) ... 27 more Caused by: org.apache.hudi.exception.HoodieJsonConversionException: failed to convert json to avro at org.apache.hudi.avro.processors.JsonFieldProcessor.convertField(JsonFieldProcessor.java:33) at org.apache.hudi.avro.MercifulJsonConverter.convertField(MercifulJsonConverter.java:198) at org.apache.hudi.avro.MercifulJsonConverter.convertJsonField(MercifulJsonConverter.java:193) at org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvro(MercifulJsonConverter.java:136) at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:117) ... 28 more {code} Edit - This appears to be caused by the fact that the precision config in the test schema doesn't match the precision of the actual data. Once that has been fixed the original exception is showing again: {code:java} 25/01/27 12:30:08 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.RuntimeException: org.apache.hudi.exception.HoodieAvroSchemaException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":5,"logicalType":"decimal","precision":10,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":10,"scale":2} at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:123) at org.apache.hudi.common.util.collection.ClosableIterator$1.next(ClosableIterator.java:41) at org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.apache.hudi.exception.HoodieAvroSchemaException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":5,"logicalType":"decimal","precision":10,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":10,"scale":2} at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1145) at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1063) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:998) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:913) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:938) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:913) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:879) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1383) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) ... 21 more {code};;; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
