I wouldn't want to jump into conclusions, but from what I see, very large
lists and vectors do not work well with flatten in 2.11, each for its own
reasons.

In any case, it's 100% not a Flink issue.

On Tue, Jul 7, 2020 at 10:10 AM Xintong Song <tonysong...@gmail.com> wrote:

> Thanks for the updates, Ori.
>
> I'm not familiar with Scala. Just curious, if what you suspect is true, is
> it a bug of Scala?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jul 7, 2020 at 1:41 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Hi,
>>
>> I just wanted to update that the problem is now solved!
>>
>> I suspect that Scala's flatten() method has a memory problem on very
>> large lists (> 2 billion elements). When using Scala Lists, the memory
>> seems to leak but the app keeps running, and when using Scala Vectors, a
>> weird IllegalArgumentException is thrown [1].
>>
>> I implemented my own flatten() method using Arrays and quickly ran into
>> NegativeArraySizeException since the integer representing the array size
>> wrapped around at Integer.MaxValue and became negative. After I started
>> catching this exception all my cluster problems just resolved. Checkpoints,
>> the heartbeat timeout, and also the memory and CPU utilization.
>>
>> I still need to confirm my suspicion towards Scala's flatten() though,
>> since I haven't "lab-tested" it.
>>
>> [1] https://github.com/NetLogo/NetLogo/issues/1830
>>
>> On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <ori....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I initially thought this, so this is why my heap is almost 30GiB.
>>> However, I started to analyze the Java Flight Recorder files, and I
>>> suspect there's a memory leak in Scala's flatten() method.
>>> I changed the line that uses flatten(), and instead of flatten() I'm
>>> just creating a ByteArray the size flatten() would have returned, and I
>>> no longer have the heartbeat problem.
>>>
>>> So now my code is
>>>     val recordingData = recordingBytes.flatten
>>>
>>> instead of
>>>     val recordingData =
>>> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>>>
>>> I attach a screenshot of Java Mission Control
>>>
>>>
>>>
>>> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <tonysong...@gmail.com>
>>> wrote:
>>>
>>>> I agree with Roman's suggestion for increasing heap size.
>>>>
>>>> It seems that the heap grows faster than freed. Thus eventually the
>>>> Full GC is triggered, taking more than 50s and causing the timeout.
>>>> However, even the full GC frees only 2GB space out of the 28GB max size.
>>>> That probably suggests that the max heap size is not sufficient.
>>>>
>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>> 113556K->112729K(1150976K)]
>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>
>>>>
>>>> I would not be so sure about the memory leak. I think it could be a
>>>> normal pattern that memory keeps growing as more data is processed. E.g.,
>>>> from the provided log, I see window operation tasks executed in the task
>>>> manager. Such operation might accumulate data until the window is emitted.
>>>>
>>>> Maybe Ori you can also take a look at the task manager log when the job
>>>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>>>> I mentioned before, it is possible that, with the same configurations Flink
>>>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>>>> changes.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <ori....@gmail.com> wrote:
>>>>
>>>>> Thank you very much for your analysis.
>>>>>
>>>>> When I said there was no memory leak - I meant that from the specific
>>>>> TaskManager I monitored in real-time using JProfiler.
>>>>> Unfortunately, this problem occurs only in 1 of the TaskManager and
>>>>> you cannot anticipate which. So when you pick a TM to profile at random -
>>>>> everything looks fine.
>>>>>
>>>>> I'm running the job again with Java FlightRecorder now, and I hope
>>>>> I'll find the reason for the memory leak.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Ori
>>>>>>
>>>>>> From the log, it looks like there IS a memory leak.
>>>>>>
>>>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>>>>> 0.4653809 secs:
>>>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>>>>
>>>>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>>>>> enough space:
>>>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M
>>>>>> Heap: 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 12929.0M(28960.0M)->12467.0M(28960.0M)]
>>>>>> ... ...
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 28042.6M(28960.0M)->27220.6M(28960.0M)]
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 28494.5M(28960.0M)->28720.6M(28960.0M)]
>>>>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>>>>>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>>>>>
>>>>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>>>>>> heartbeat timed out:
>>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>>> 113556K->112729K(1150976K)]
>>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>>>>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>>>>>> JobManager with id bc59ba6a
>>>>>>
>>>>>> No substantial amount memory was freed after that.
>>>>>>
>>>>>> If this memory usage pattern is expected, I'd suggest to:
>>>>>> 1. increase heap size
>>>>>> 2. play with PrintStringDeduplicationStatistics and
>>>>>> UseStringDeduplication flags - probably string deduplication is making G1
>>>>>> slower then CMS
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <ori....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <tonysong...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Maybe you can share the log and gc-log of the problematic
>>>>>>>> TaskManager? See if we can find any clue.
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've found out that sometimes one of my TaskManagers experiences a
>>>>>>>>> GC pause of 40-50 seconds and I have no idea why.
>>>>>>>>> I profiled one of the machines using JProfiler and everything
>>>>>>>>> looks fine. No memory leaks, memory is low.
>>>>>>>>> However, I cannot anticipate which of the machines will get the
>>>>>>>>> 40-50 seconds pause and I also cannot profile all of them all the 
>>>>>>>>> time.
>>>>>>>>>
>>>>>>>>> Any suggestions?
>>>>>>>>>
>>>>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <
>>>>>>>>> tonysong...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>>>>>> compared to previous versions. This could be related to your 
>>>>>>>>>> observations,
>>>>>>>>>> because with the same configurations, it is possible that there's 
>>>>>>>>>> less JVM
>>>>>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>>>>>> migration guide [1].
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>>
>>>>>>>>>>> > i recently tried 1.10 and see this error frequently. and i
>>>>>>>>>>> dont have the same issue when running with 1.9.1
>>>>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in
>>>>>>>>>>> the occurrences in the heartbeat timeout
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>>>    the timeout to see if that helps. You can leverage the 
>>>>>>>>>>> configuration option
>>>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe 
>>>>>>>>>>> the easiest
>>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including 
>>>>>>>>>>> the JVM
>>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able 
>>>>>>>>>>> to see the
>>>>>>>>>>>    most recent metrics due to the process not responding to the 
>>>>>>>>>>> metric
>>>>>>>>>>>    querying services.
>>>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>>>    is under significant GC pressure, it could also happen that the 
>>>>>>>>>>> heartbeat
>>>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Weirdly enough, I did manage to find a problem with the timed
>>>>>>>>>>> out TaskManagers, which slipped away the last time I checked: The 
>>>>>>>>>>> timed out
>>>>>>>>>>> TaskManager is always the one with the max. GC time (young 
>>>>>>>>>>> generation). I
>>>>>>>>>>> see it only now that I run with G1GC, but with the previous GC it 
>>>>>>>>>>> wasn't
>>>>>>>>>>> the case.
>>>>>>>>>>>
>>>>>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>>>>>> this?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <
>>>>>>>>>>> tonysong...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Ori,
>>>>>>>>>>>>
>>>>>>>>>>>> Here are some suggestions from my side.
>>>>>>>>>>>>
>>>>>>>>>>>>    - Probably the most straightforward way is to try
>>>>>>>>>>>>    increasing the timeout to see if that helps. You can leverage 
>>>>>>>>>>>> the
>>>>>>>>>>>>    configuration option `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe 
>>>>>>>>>>>> the easiest
>>>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, 
>>>>>>>>>>>> including the JVM
>>>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be 
>>>>>>>>>>>> able to see the
>>>>>>>>>>>>    most recent metrics due to the process not responding to the 
>>>>>>>>>>>> metric
>>>>>>>>>>>>    querying services.
>>>>>>>>>>>>    - You may also look into the status of the JM process. If
>>>>>>>>>>>>    JM is under significant GC pressure, it could also happen that 
>>>>>>>>>>>> the
>>>>>>>>>>>>    heartbeat message from TM is not timely handled before the 
>>>>>>>>>>>> timeout check.
>>>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you~
>>>>>>>>>>>>
>>>>>>>>>>>> Xintong Song
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <
>>>>>>>>>>>> ori....@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled.
>>>>>>>>>>>>> My state size is appx. 500gb.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out"
>>>>>>>>>>>>> errors with no apparent reason.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I check the container that gets the timeout for GC pauses,
>>>>>>>>>>>>> heap memory, direct memory, mapped memory, offheap memory, CPU 
>>>>>>>>>>>>> load,
>>>>>>>>>>>>> network load, total out-records, total in-records, backpressure, 
>>>>>>>>>>>>> and
>>>>>>>>>>>>> everything I can think of. But all those metrics show that 
>>>>>>>>>>>>> there's nothing
>>>>>>>>>>>>> unusual, and it has around average values for all those metrics. 
>>>>>>>>>>>>> There are
>>>>>>>>>>>>> a lot of other containers which score higher.
>>>>>>>>>>>>>
>>>>>>>>>>>>> All the metrics are very low because every TaskManager runs on
>>>>>>>>>>>>> a r5.2xlarge machine alone.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>>>>>> explanation for it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>>>>>
>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of
>>>>>>>>>>>>> TaskManager with id container_1593074931633_0011_01_000127
>>>>>>>>>>>>> timed out.
>>>>>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>>>>>> Executors.java:511)
>>>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266
>>>>>>>>>>>>> )
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>>> .scala:26)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>>> .scala:21)
>>>>>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>>>>>> .scala:123)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(
>>>>>>>>>>>>> CaseStatements.scala:21)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:170)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:171)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:171)
>>>>>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor
>>>>>>>>>>>>> .scala:225)
>>>>>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592
>>>>>>>>>>>>> )
>>>>>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>>>>>> .java:260)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(
>>>>>>>>>>>>> ForkJoinPool.java:1979)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to