[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655242#comment-16655242
 ] 

Aljoscha Krettek commented on FLINK-10382:
------------------------------------------

I think it would be good to somehow fix it but I'm not actively working on it, 
and the workaround is to copy and implement on your own. So it's somewhat ok 
for now.

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-10382
>                 URL: https://issues.apache.org/jira/browse/FLINK-10382
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Chengzhi Zhao
>            Priority: Major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
>     try {
>         super.close();
>     } finally {
>         if (keyValueWriter != null) {
>             keyValueWriter.close();
>         }
>     }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to