Hi 

       I use Flink 1.12.4 on yarn,  job topology is.  kafka -> source -> 
flatmap -> window 1 min agg -> sink -> kafka.  Checkpoint is enable ,  
checkpoint interval is 20s . When I cancel my job,  some TM cancel  success, 
some TM become cenceling and the TM  will be kill by itself  with 
task.cancellation.timeout = 180000.  the TM log show that 


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74)
 [testFlink-1.0.jar:?]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29)
 [testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 [flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 [flink-dist_2.11-1.12.4.jar:1.12.4]


Caused by: java.io.IOException: Interrupted while waiting for buffer
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) 
~[testFlink-1.0.jar:?]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) 
~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
... 32 more




My question :


1. what can I do to deal with  this error ? 
2. if I cancel job with savepoint ,  will  this error  affect  savepoint ?




Best !

Reply via email to