Re: Exceptions from collector.collect after cancelling job

2016-09-30 Thread Shannon Carey
My flat map function is catching & logging the exception. The try block happens 
to encompass the call to Collector#collect().

I will move the call to collect outside of the try. That should silence the log 
message.




On 9/30/16, 3:51 AM, "Ufuk Celebi"  wrote:

>On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey  wrote:
>> It looks like Flink is disabling the objects that the FlatMap collector
>> relies on before disabling the operator itself. Is that expected/normal? Is
>> there anything I should change in my FlatMap function or job code to account
>> for it?
>
>Hey Shannon,
>
>Flink actually does cancel the tasks *before* cleaning up the network
>resources that throw the root Exception here.
>
>We actually don't log any Exceptions that are thrown during
>cancellation, because it is possible that the user code/operator use
>the closed resources concurrently with cancellation (your stack traces
>essentially), but it looks like in some places we don't respect this.
>
>Can you tell which classes actually log this? Would be good to fix
>this if possible as it is very confusing and looks quite bad. I don't
>expect it to be an actual problem though.
>
>– Ufuk
>


Re: Exceptions from collector.collect after cancelling job

2016-09-30 Thread Ufuk Celebi
On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey  wrote:
> It looks like Flink is disabling the objects that the FlatMap collector
> relies on before disabling the operator itself. Is that expected/normal? Is
> there anything I should change in my FlatMap function or job code to account
> for it?

Hey Shannon,

Flink actually does cancel the tasks *before* cleaning up the network
resources that throw the root Exception here.

We actually don't log any Exceptions that are thrown during
cancellation, because it is possible that the user code/operator use
the closed resources concurrently with cancellation (your stack traces
essentially), but it looks like in some places we don't respect this.

Can you tell which classes actually log this? Would be good to fix
this if possible as it is very confusing and looks quite bad. I don't
expect it to be an actual problem though.

– Ufuk


Exceptions from collector.collect after cancelling job

2016-09-29 Thread Shannon Carey
When I cancel a job, I get many exceptions that look like this:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
... lots of Flink and user code (a flat map function) stack entries here
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 43 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 48 more

It looks like Flink is disabling the objects that the FlatMap collector relies 
on before disabling the operator itself. Is that expected/normal? Is there 
anything I should change in my FlatMap function or job code to account for it?

-Shannon