When I cancel a job, I get many exceptions that look like this:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
... lots of Flink and user code (a flat map function) stack entries here
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 43 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 48 more

It looks like Flink is disabling the objects that the FlatMap collector relies 
on before disabling the operator itself. Is that expected/normal? Is there 
anything I should change in my FlatMap function or job code to account for it?

-Shannon

Reply via email to