Cancel job error ! Interrupted while waiting for buffer

2021-06-25 Thread SmileSmile
Hi 


   I use Flink 1.12.4 on yarn,  job topology is.  kafka -> source -> 
flatmap -> window 1 min agg -> sink -> kafka.  Checkpoint is enable ,  
checkpoint interval is 20s . When I cancel my job,  some TM cancel  success, 
some TM become cenceling and the TM  will be kill by itself  with 
task.cancellation.timeout = 18.  the TM log show that 


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74)
 [testFlink-1.0.jar:?]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29)
 [testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 [flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 [flink-dist_2.11-1.12.4.jar:1.12.4]


Caused by: java.io.IOException: Interrupted while waiting for buffer
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) 
~[testFlink-1.0.jar:?]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) 
~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
... 32 more




My question :


1. what can I do to deal with  this error ? 
2. if I cancel job with savepoint ,  will  this error  affect  savepoint ?




Best !



Re: Cancel job error ! Interrupted while waiting for buffer

2021-06-28 Thread Piotr Nowojski
Hi,

It's hard to say from the log fragment, but I presume this task has
correctly switched to "CANCELLED" state and this error should not have been
logged as an ERROR, right? How did you get this stack trace? Maybe it was
logged as a DEBUG message? If not, that would be probably a minor bug in
Flink and can you post a larger fragment of the log including the stack
trace and the log line that has printed it?

In short, this kind of exception is a normal thing to happen and expected
when cancelling a job. If your code is busy and blocked while being
backpressured (as your FlatMap operation was in this particular case),
interrupting the code is a standard thing that Flink is doing. However it
shouldn't bubble up to the end user exactly for this reason - to not
confuse users.

> some TM cancel  success, some TM become cenceling and the TM  will be
kill by itself  with task.cancellation.timeout = 18

This part is a bit confusing to me. The above interruption should actually
prevent this timeout from kicking in and TM shouldn't be killed. Again can
you post larger part of the TM/JM logs or even better, full TM/JM logs?

best,
Piotrek



sob., 26 cze 2021 o 04:59 SmileSmile  napisaƂ(a):

> Hi
>
>I use Flink 1.12.4 on yarn,  job topology is.  kafka -> source ->
> flatmap -> window 1 min agg -> sink -> kafka.  Checkpoint is enable ,
> checkpoint interval is 20s . When I cancel my job,  some TM cancel
> success, some TM become cenceling and the TM  will be kill by itself  with
> task.cancellation.timeout = 18.  the TM log show that
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74)
> [testFlink-1.0.jar:?]
> at
> com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29)
> [testFlink-1.0.jar:?]
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> [flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
> [flink-dist_2.11-1.12.4.jar:1.12.4]
>
> Caused by: java.io.IOException: Interrupted while waiting for buffer
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72)
> ~[testFlin