Re: Exceptions from collector.collect after cancelling job
My flat map function is catching & logging the exception. The try block happens to encompass the call to Collector#collect(). I will move the call to collect outside of the try. That should silence the log message. On 9/30/16, 3:51 AM, "Ufuk Celebi"wrote: >On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey wrote: >> It looks like Flink is disabling the objects that the FlatMap collector >> relies on before disabling the operator itself. Is that expected/normal? Is >> there anything I should change in my FlatMap function or job code to account >> for it? > >Hey Shannon, > >Flink actually does cancel the tasks *before* cleaning up the network >resources that throw the root Exception here. > >We actually don't log any Exceptions that are thrown during >cancellation, because it is possible that the user code/operator use >the closed resources concurrently with cancellation (your stack traces >essentially), but it looks like in some places we don't respect this. > >Can you tell which classes actually log this? Would be good to fix >this if possible as it is very confusing and looks quite bad. I don't >expect it to be an actual problem though. > >– Ufuk >
Re: Exceptions from collector.collect after cancelling job
On Thu, Sep 29, 2016 at 9:29 PM, Shannon Careywrote: > It looks like Flink is disabling the objects that the FlatMap collector > relies on before disabling the operator itself. Is that expected/normal? Is > there anything I should change in my FlatMap function or job code to account > for it? Hey Shannon, Flink actually does cancel the tasks *before* cleaning up the network resources that throw the root Exception here. We actually don't log any Exceptions that are thrown during cancellation, because it is possible that the user code/operator use the closed resources concurrently with cancellation (your stack traces essentially), but it looks like in some places we don't respect this. Can you tell which classes actually log this? Would be good to fix this if possible as it is very confusing and looks quite bad. I don't expect it to be an actual problem though. – Ufuk
Exceptions from collector.collect after cancelling job
When I cancel a job, I get many exceptions that look like this: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ... lots of Flink and user code (a flat map function) stack entries here at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 43 more Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) ... 48 more It looks like Flink is disabling the objects that the FlatMap collector relies on before disabling the operator itself. Is that expected/normal? Is there anything I should change in my FlatMap function or job code to account for it? -Shannon