hudi-bot opened a new issue, #14796:
URL: https://github.com/apache/hudi/issues/14796
Currently, there are few different options to the user to provide target
schemas such as file based, schema registry. At a high level, there are 2 main
flows
# Target Schema is provided by the user
# Target schema is not provided by the user (which is then inferred from
the incoming data)
||Schema post processor enabled||Transformers||User provided target
schema||Cur behavior||
|yes|No|Yes|table schema's has no namespace. matches user provided schema|
|yes|yes|No|had to make minor fix in post processor for NPE. with the fix,
table schema has namespace in it.|
|yes|yes|yes|table schema has namespace|
|no|no|yes|table schema's has no namespace. matches user provided schema|
|no|yes|yes|table schema's has no namespace. matches user provided schema|
|no|yes|no|table's schema has namespace.|
Source -> [https://github.com/apache/hudi/pull/2937]
As you can see above, if one switches from a non-user-provided schema flow
to a user-provided-schema flow, we switch from namespace in schema to no
namespace in schema.
Parquet does not store the namespace, so when moving across avro schemas
with and without namespace, the parquet-avro writer or reader does not complain
since parquet itself does not store namespace.
However, for MergeOnRead tables, we serialize data and schema in the log
blocks. The GenericDatumReader that takes a reader & writer schema to translate
breaks when one schema has namespace while the other doesn't.
The following exception is thrown
{noformat}
51511 [Executor task launch worker for task 502] ERROR
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner - Got
exception when reading log file
org.apache.avro.AvroTypeException: Found
hoodie.source.hoodie_source.height.fixed, expecting fixed
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
at
org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
at
org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
at
org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
at
org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
at
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748{noformat}
This seems like an AVRO shortcoming. We need a way to avoid breaking the
decoding of avro data in log files if the user moved around provider options.
One way is to implement a custom GenericDatumReader.
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-1906
- Type: Improvement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]