[ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--------------------------------------
    Fix Version/s: 0.11.1
                   0.12.0
                       (was: 0.11.0)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --------------------------------------------------------------
>
>                 Key: HUDI-2875
>                 URL: https://issues.apache.org/jira/browse/HUDI-2875
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Common Core, writer-core
>            Reporter: ZiyueGuan
>            Assignee: ZiyueGuan
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 0.11.1, 0.12.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file <FilePath>
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call both write and close to parquet writer concurrently. Data may being 
> written while we call close. In close method, compressor (a class used by 
> parquet to do compressing which has a stateful data structure insied) will be 
> cleared and payback to a pool for following reuse. Due to the concurrent 
> write mentioned above, data may be continued pushed to compressor even though 
> we have them cleared. Besides, there is a mechanism inside compressor which 
> tries to check some invalid use. That's why some of invalid usage will throw 
> exception rather than generate corrupted parquet.
> Validation:
> Current solution is validated by production environment. A signal is that 
> when this fix applied is that there should be no task failed due to some 
> error like "BlockCompressorStream: write beyond end of stream". The reason is 
> that BlockCompressorStream checking mechanism will not be triggered by 
> concurrent write.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to