[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16910497#comment-16910497
 ] 

John Lonergan commented on FLINK-10382:
---------------------------------------

Just fell over this same problem - Fundamentally this class doesn't work as far 
as I can tell.

Presumably there are missing tests - which raises a question around QA in Flink 
- what is the overall quality of testing in Flink?

I suggest that rather than just documenting this class as deprecated it comes 
with a comment saying it's broken and should be avoided - or just delete it 
entirely if this class has been abandoned.

---

As far as a "fix" I think something like this ought to work. Flush needed first 
to avoid potential data loss.
 
{{@Override public void close() throws IOException { 
   if (keyValueWriter != null) {
     flush(); 
     super.close();
     keyValueWriter = null;
  } else { // need to make sure we close this if we never created the Key/Value 
Writer.
      super.close();
  }
}
}}


Is the bucketing sink abandoned?

This close issue only seems to be safe on StringWriter and SequenceFileWriter, 
the first because there's nothing to close and the second because it knows 
whether it owns the stream or not.



> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-10382
>                 URL: https://issues.apache.org/jira/browse/FLINK-10382
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Chengzhi Zhao
>            Priority: Major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
>     try {
>         super.close();
>     } finally {
>         if (keyValueWriter != null) {
>             keyValueWriter.close();
>         }
>     }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to