Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ger...@talaia.io>
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 <wangzhijiang...@aliyun.com>
抄 送:user <user@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com> wrote:
 Hi Gerard,

    From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

------------------------------------------------------------------
发件人:gerardg <ger...@talaia.io>
发送时间:2018年6月30日(星期六) 00:12
收件人:user <user@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) 
java.lang.Thread.run(Thread.java:748) 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) 
java.lang.Thread.run(Thread.java:748) 

 Our task bundles several thousand of messages together so it creates some big 
single messages which could explain why the operator hangs trying to serialize 
the message. Our problem is that when a task hangs is very difficult to detect 
and we have to manually cancel and restart it. 

 Is there any way to make the task manager fail or to increase the memory 
required by the allocation? 

 Thanks, Gerard 
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Reply via email to