[ 
https://issues.apache.org/jira/browse/PARQUET-1773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021619#comment-17021619
 ] 

Tristan Davolt edited comment on PARQUET-1773 at 1/22/20 11:33 PM:
-------------------------------------------------------------------

I think I may have found the issue, but tell me if you think this is valid. We 
have a timer that calls parquetWriter.close() and flushes everything to S3 
every once in a while for consumers with less traffic, just to make sure we're 
sending data in a timely manner. I found a moment when a message was received 
and processed within the same millisecond as the timer task. The received 
message called parquetWriter.write(). I believe the race condition has 
something to do with shared resources at the InternalParquetRecordWriter level.

We also see a NPE when this issue begins. Both .close() and .write() eventually 
call flushRowGroupToStore(). If they are accessing the same resources, the NPE 
may occur when one thread sets columnStore or pageStore to null, but the other 
is still trying to access them.

Before throwing the error, flushRowGroupToStore calls startBlock, then it fails 
and never call endBlock(). Thus it enters the BLOCK state without ever being 
freed and ever call afterward fails.

Is this a valid theory? I don't currently have a stacktrace for the NPE, but 
I'll send an update if I get one.


was (Author: tdavolt):
I think I may have found the issue, but tell me if you think this is valid. We 
have a timer that calls parquetWriter.close() and flushes everything to S3 
every once in a while for consumers with less traffic, just to make sure we're 
sending data in a timely manner. I found a moment when a message was received 
and processed within the same millisecond as the timer task. The received 
message called parquetWriter.write(). I believe the race condition has 
something to do with shared resources at the InternalParquetRecordWriter level.

We also see a NPE when this issue begins. Both .close() and .write() eventually 
call flushRowGroupToStore(). If they are accessing the same resources, the NPE 
may occur when one thread sets columnStore or pageStore to null, but the other 
is still trying to access them.

Is this a valid theory?

> Parquet file in invalid state while writing to S3 when calling 
> ParquetWriter.write
> ----------------------------------------------------------------------------------
>
>                 Key: PARQUET-1773
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1773
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Tristan Davolt
>            Priority: Major
>
> This may be related to PARQUET-632. I am also writing parquet to S3, but I am 
> calling ParquetWriter.write directly. I have multiple containerized instances 
> consuming messages from Kafka, converting them to Parquet, and then writing 
> to S3. One instance will begin to throw this exception for all new messages. 
> Sometimes, the container will recover. Other times, it must be restarted 
> manually to recover. I am unable to find any "error thrown previously."
> Exception:
>  java.io.IOException
>  Message:
>  The file being written is in an invalid state. Probably caused by an error 
> thrown previously. Current state: BLOCK
>  Stacktrace:
> {code:java}
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171)org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to