veysiertekin commented on issue #7991: [FLINK-11928][Connectors / FileSystem]Create a Compression string writer for flink-connectors-filesystem URL: https://github.com/apache/flink/pull/7991#issuecomment-573359364 There is a problem with closing operation: ```java 2020-01-11 14:41:02 java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at flink.test.job.TestWindowFunction$.apply(TestJob.scala:83) at flink.test.job.TestWindowFunction$.apply(TestJob.scala:78) at org.apache.flink.streaming.api.scala.function.util.ScalaWindowFunctionWrapper.apply(ScalaWindowFunctionWrapper.scala:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) ... 10 more Caused by: java.lang.IllegalStateException: Writer has already been opened at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69) at flink.test.sink.CompressionStringWriter.open(CompressionStringWriter.java:77) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:588) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:463) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ... 26 more ``` I had to fixed with (maybe there is a better solution for this): ```java @Override public void close() throws IOException { if (compressedOutputStream != null) { compressedOutputStream.finish(); compressedOutputStream = null; } try { super.close(); } finally { if (compressor != null) { CodecPool.returnCompressor(compressor); compressor = null; } } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
