[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-06-02 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-2875:

Fix Version/s: (was: 0.12.0)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core, writer-core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.11.1
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
>

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-05-16 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Fix Version/s: 0.11.1
   0.12.0
   (was: 0.11.0)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core, writer-core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.11.1, 0.12.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-05-16 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Status: Open  (was: In Progress)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core, writer-core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMerg

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-03-29 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-2875:
-
Priority: Critical  (was: Major)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call bo

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-03-29 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-2875:
-
Component/s: writer-core

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core, writer-core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We ca

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-03-01 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-2875:
-
Sprint: Cont' improve -  2022/02/07, Cont' improve -  2022/02/14  (was: 
Cont' improve -  2022/02/07, Cont' improve -  2022/02/14, Cont' improve -  
2022/02/21)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/ma

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-22 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Sprint: Cont' improve -  2022/02/07, Cont' improve -  2022/02/14, Cont' 
improve -  2022/02/21  (was: Cont' improve -  2022/02/07, Cont' improve -  
2022/02/14)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/a

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-15 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Sprint: Cont' improve -  2022/02/07, Cont' improve -  2022/02/14  (was: 
Cont' improve -  2022/02/07)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/ma

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-09 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Status: In Progress  (was: Open)

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L10

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-08 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
 Reviewers: sivabalan narayanan
Remaining Estimate: 1h
 Original Estimate: 1h

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-08 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2875:
--
Fix Version/s: 0.11.0

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.11.0
>
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call both write and close to parquet writer concurrentl

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2022-02-08 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-2875:
-
Sprint: Cont' improve -  2022/02/07

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call both write and close to parquet writer concurrently. Data may being 
> written while w

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-12-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-2875:
-
Labels: pull-request-available  (was: )

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks working at the same 
> executor will have the risk to suffer a wrong parquet generation. This will 
> not always result in corrupted parquet file. Nearly half of them will throw 
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code: 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call both write and close to parquet writer concurrently. Data may being 
> wri

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-12-10 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-2875:

Description: 
Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks working at the same 
executor will have the risk to suffer a wrong parquet generation. This will not 
always result in corrupted parquet file. Nearly half of them will throw 
exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter.

In the following code: 

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]

We call both write and close to parquet writer concurrently. Data may being 
written while we call close. In close method, compressor (a class used by 
parquet to do compressing which has a stateful data structure insied) will be 
cleared and payback to a pool for following reuse. Due to the concurrent write 
mentioned above, data may be continued pushed to compressor even though we have 
them cleared. Besides, there is a mechanism inside compressor which tries to 
check some invalid use. That's why some of invalid usage will throw exception 
rather than generate corrupted parquet.

Validation:

Current solution is validated by production environment. A single is that when 
this fix applied is that there should be no task failed due 

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-12-10 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-2875:

Description: 
Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks working at the same 
executor will have the risk to suffer a wrong parquet generation. This will not 
always result in corrupted parquet file. Nearly half of them will throw 
exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter.

In the following code: 

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]

We call both write and close to parquet writer concurrently. Data may being 
written while we call close. In close method, compressor (a class used by 
parquet to do compressing which has a stateful data structure insied) will be 
cleared and payback to a pool for following reuse. Due to the concurrent write 
mentioned above, data may be continued pushed to compressor even though we have 
them cleared. Besides, there is a mechanism inside compressor which tries to 
check some invalid use. That's why some of invalid usage will throw exception 
rather than generate corrupted parquet.

Validation:

Current solution is validated by production environment. A signal is that when 
this fix applied is that there should be no task failed due