Thanks Zhijiang,

Yes, I guess our best option right now is to just reduce the structure of
the output record and see if that solves the problem.

Gerard

On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> From the jstack you provided, the task is serializing the output record
> and during this process it will not process the input data any more.
> It can not indicate out of memory issue from this stack. And if the output
> buffer is exhausted, the task will be blocked on requestBufferBlocking
> process.
>
> I think the key point is your output record is too large and complicated
> structure, because every field and collection in this complicated class
> will be traversed to serialize, then it will cost much time and CPU usage.
> Furthermore, the checkpoint can not be done because of waiting for lock
> which is also occupied by task output process.
>
> As you mentioned, it makes sense to check the data structure of the output
> record and reduces the size or make it lightweight to handle.
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ger...@talaia.io>
> 发送时间:2018年7月17日(星期二) 21:53
> 收件人:piotr <pi...@data-artisans.com>
> 抄 送:fhueske <fhue...@gmail.com>; wangzhijiang999 <
> wangzhijiang...@aliyun.com>; user <user@flink.apache.org>; nico <
> n...@data-artisans.com>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record
> (probably too big, we have already started working to reduce its size)
> which consists of several case classes which have (among others) fields of
> type String.
>
> I attach a CPU profile of the thread stuck serializing. I also attach the
> memory and GC telemetry that the profiler shows (which maybe is more
> informative than the one recorded from the JVM metrics). Only one node was
> actually "doing something" all others had CPU usage near zero.
>
> The task is at the same time trying to perform a checkpoint but keeps
> failing. Would it make sense that the problem is that there is not enough
> memory available to perform the checkpoint so all operators are stuck
> waiting for it to finish, and at the same time, the operator stuck
> serializing is keeping all the memory so neither it nor the checkpoint can
> advance?
>
> I realized that I don't have a minimum pause between checkpoints so it is
> continuously trying. Maybe I can reduce the checkpoint timeout from the 10m
> default and introduce a minimum pause (e.g. 5m timeout and 5m minimum
> pause) and this way I could break the deadlock.
>
> Gerard
>
>
> On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> Hi,
>
> Thanks for the additional data. Just to make sure, are you using Flink
> 1.5.0?
>
> There are a couple of threads that seams to be looping in serialisation,
> while others are blocked and either waiting for new data or waiting for
> some one to consume some data. Could you debug or CPU profile the code, in
> particularly focusing on threads with stack trace as below [1]. Aren’t you
> trying to serialise some gigantic String?
>
> Piotrek
>
> [1]:
>
> "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819
> runnable [0x00007f451a843000]
>    java.lang.Thread.State: RUNNABLE
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at (...)
> at (...)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> - locked <0x00007f4b5488f2b8> (a java.lang.Object)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
>
> On 16 Jul 2018, at 17:03, Gerard Garcia <ger...@talaia.io> wrote:
>
> Hi Piotr,
>
> I attach the GC pauses logged a while back when the task stopped
> processing during several hours (it stopped at about 20:05) and a jstack
> dump from the last time the task hanged.
>
> Thanks,
>
> Gerard
>
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> Hi Gerard,
>
> I second to what Zhijiang wrote. Please check GC pauses, either via GC
> logging, 3rd party tool like jconsole (or some memory profiler) or via
> enabling resource logging in Flink.
>
> After confirming that this is not the issue next time this happens,
> instead of cancelling the job, please collect thread dumps on a process
> that is stuck.
>
> Piotrek
>
> On 16 Jul 2018, at 13:53, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Gerard,
>
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
> been working on the networking stack lately and might have some ideas
> regarding your issue.
>
> Best, Fabian
>
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com>:
> Hi Gerard,
>
> I thought the failed task triggers cancel process before, now I am clear
> that you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked
> instead of canceling it, then we may find some hints.
>
> In addition, the following stack "DataOutputSerializer.resize" indicates
> the task is serializing the record and there will be overhead byte buffers
> in the serializer for copying data temporarily. And if your record is too
> large, it may cause OOM in this process and this overhead memory is not
> managed by flink framework. Also you can monitor the gc status to check the
> full gc delay.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ger...@talaia.io>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999 <wangzhijiang...@aliyun.com>
> 抄 送:user <user@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Hi Zhijiang,
>
> The problem is that no other task failed first. We have a task that
> sometimes just stops processing data, and when we cancel it, we see the
> logs messages  saying:
>
> " Task (...) did not react to cancelling signal for 30 seconds, but is
> stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
>
> That is why we suspect that it hangs forever at that point and that is why
> it stops processing data. I don;t see any increase in memory use in the
> heap (I guess because these buffers are managed by Flink) so I'm not sure
> if that is really the problem.
>
> Gerard
>
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ger...@talaia.io>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 <wangzhijiang...@aliyun.com>
> 抄 送:user <user@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>  Hi Gerard,
>
>     From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:gerardg <ger...@talaia.io>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <user@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> Our task bundles several thousand of messages together so it creates some
> big single messages which could explain why the operator hangs trying to
> serialize the message. Our problem is that when a task hangs is very
> difficult to detect and we have to manually cancel and restart it.
>
> Is there any way to make the task manager fail or to increase the memory
> required by the allocation?
>
> Thanks, Gerard
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com <http://nabble.com/>.
>
>
>
>
>
> <Heap size.png><G1 Young|Old generation time.png><hang_jstack>
>
>
>

Reply via email to