[ https://issues.apache.org/jira/browse/HUDI-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Chen closed HUDI-6052. ---------------------------- Fix Version/s: 0.13.1 0.14.0 Resolution: Fixed Fixed via master branch: 64bf871cfc3cfc08478cf04e02d2f7086f72548e > Standardise TIMESTAMP(6) format when writing to Parquet files > ------------------------------------------------------------- > > Key: HUDI-6052 > URL: https://issues.apache.org/jira/browse/HUDI-6052 > Project: Apache Hudi > Issue Type: Bug > Reporter: voon > Assignee: voon > Priority: Major > Labels: pull-request-available > Fix For: 0.13.1, 0.14.0 > > > h1. APPEND-ONLY MODE > > When *TIMESTAMP(6)* is used for *APPEND-ONLY* pipelines with > inline-clustering enabled, the error below will be thrown: > > > {code:java} > Caused by: org.apache.hudi.exception.HoodieException: unable to read next > record from parquet file > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) > at > java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) > at > java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295) > at > java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207) > at > java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162) > at > java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301) > at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) > at > org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) > at > org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307) > at > org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 1 in block 0 in file > file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) > ... 22 more > Caused by: java.lang.UnsupportedOperationException: > org.apache.parquet.avro.AvroConverters$FieldLongConverter > at > org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70) > at > org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390) > at > org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440) > at > org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30) > at > org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229) > ... 25 more > Process finished with exit code 255 {code} > > > Sample code to trigger this: > > {code:java} > CREATE TABLE `src_table` ( > `id` INT, > `userId` INT, > `name` STRING, > `timestamp_col` TIMESTAMP(6) > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '50' > ); > -- will write TIMESTAMP(6) type as INT96 > CREATE TABLE `sink_table` > ( > `id` INT, > `userId` INT, > `name` STRING, > `timestamp_col` TIMESTAMP(6) > ) > WITH ( > 'connector' = 'hudi', > 'path' = 'hdfs://path/to/table/', > 'table.type' = 'COPY_ON_WRITE', > 'write.operation' = 'insert', > 'hoodie.datasource.write.recordkey.field' = 'id', > 'hive_sync.enable' = 'false', > 'hoodie.datasource.write.hive_style_partitioning' = 'true', > 'clustering.async.enabled' = 'true', -- enable inline clustering > 'clustering.schedule.enabled'= 'true', -- enable clustering schedule > 'clustering.delta_commits'='4', -- schedule clustering every 4 commits > 'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only > rewrite file smaller than 100MB > ); > insert into sink_table > select > * > from src_table;{code} > > After looking through the code, we realised that the same TIMESTAMP(6) type > will be written as INT96 to parquet when AppendWriteFunction is used. > > Snippet extracted from *parquet-tools* to show the physical type in parquet: > > {code:java} > ############ Column(timestamp_col)[row group 0] ############ > name: timestamp_col > path: timestamp_col > max_definition_level: 1 > max_repetition_level: 0 > physical_type: INT96 > logical_type: None > converted_type (legacy): NONE > compression: GZIP (space_saved: 55%) > total_compressed_size: 1102 > total_uncompressed_size: 2444 {code} > > ---- > h1. > h1. UPSERT MODE > > However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written > as INT64 to parquet. > > One can reproduce this by using the code below (changing the > *write.operation* value to {*}update{*}) > > {code:java} > CREATE TABLE `src_table` ( > `id` INT, > `userId` INT, > `name` STRING, > `timestamp_col` TIMESTAMP(6) > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '50' > ); > -- will write TIMESTAMP(6) type as INT64 > CREATE TABLE `sink_table` > ( > `id` INT, > `userId` INT, > `name` STRING, > `timestamp_col` TIMESTAMP(6) > ) > WITH ( > 'connector' = 'hudi', > 'path' = 'hdfs://path/to/table/', > 'table.type' = 'COPY_ON_WRITE', > 'write.operation' = 'update', > 'hoodie.datasource.write.recordkey.field' = 'id', > 'hive_sync.enable' = 'false', > 'hoodie.datasource.write.hive_style_partitioning' = 'true', > 'clustering.async.enabled' = 'true', -- enable inline clustering > 'clustering.schedule.enabled'= 'true', -- enable clustering schedule > 'clustering.delta_commits'='4', -- schedule clustering every 4 commits > 'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only > rewrite file smaller than 100MB > ); > insert into sink_table > select > * > from src_table; {code} > > > Snippet extracted from *parquet-tools* to show the physical type in parquet: > {code:java} > ############ Column(timestamp_col)[row group 0] ############ > name: timestamp_col > path: timestamp_col > max_definition_level: 1 > max_repetition_level: 0 > physical_type: INT64 > logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, > is_from_converted_type=false, force_set_converted_type=false) > converted_type (legacy): TIMESTAMP_MICROS > compression: GZIP (space_saved: 26%) > total_compressed_size: 1228 > total_uncompressed_size: 1654 {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)