Hi, I just wanted to update that the problem is now solved!
I suspect that Scala's flatten() method has a memory problem on very large lists (> 2 billion elements). When using Scala Lists, the memory seems to leak but the app keeps running, and when using Scala Vectors, a weird IllegalArgumentException is thrown [1]. I implemented my own flatten() method using Arrays and quickly ran into NegativeArraySizeException since the integer representing the array size wrapped around at Integer.MaxValue and became negative. After I started catching this exception all my cluster problems just resolved. Checkpoints, the heartbeat timeout, and also the memory and CPU utilization. I still need to confirm my suspicion towards Scala's flatten() though, since I haven't "lab-tested" it. [1] https://github.com/NetLogo/NetLogo/issues/1830 On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <ori....@gmail.com> wrote: > Hi, > > I initially thought this, so this is why my heap is almost 30GiB. > However, I started to analyze the Java Flight Recorder files, and I > suspect there's a memory leak in Scala's flatten() method. > I changed the line that uses flatten(), and instead of flatten() I'm just > creating a ByteArray the size flatten() would have returned, and I no > longer have the heartbeat problem. > > So now my code is > val recordingData = recordingBytes.flatten > > instead of > val recordingData = > Array.fill[Byte](recordingBytes.map(_.length).sum)(0) > > I attach a screenshot of Java Mission Control > > > > On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <tonysong...@gmail.com> wrote: > >> I agree with Roman's suggestion for increasing heap size. >> >> It seems that the heap grows faster than freed. Thus eventually the Full >> GC is triggered, taking more than 50s and causing the timeout. However, >> even the full GC frees only 2GB space out of the 28GB max size. That >> probably suggests that the max heap size is not sufficient. >> >>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure) >>> 28944M->26018M(28960M), 51.5256128 secs] >>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap: >>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace: >>> 113556K->112729K(1150976K)] >>> [Times: user=91.08 sys=0.06, real=51.53 secs] >> >> >> I would not be so sure about the memory leak. I think it could be a >> normal pattern that memory keeps growing as more data is processed. E.g., >> from the provided log, I see window operation tasks executed in the task >> manager. Such operation might accumulate data until the window is emitted. >> >> Maybe Ori you can also take a look at the task manager log when the job >> runs with Flink 1.9 without this problem, see how the heap size changed. As >> I mentioned before, it is possible that, with the same configurations Flink >> 1.10 has less heap size compared to Flink 1.9, due to the memory model >> changes. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <ori....@gmail.com> wrote: >> >>> Thank you very much for your analysis. >>> >>> When I said there was no memory leak - I meant that from the specific >>> TaskManager I monitored in real-time using JProfiler. >>> Unfortunately, this problem occurs only in 1 of the TaskManager and you >>> cannot anticipate which. So when you pick a TM to profile at random - >>> everything looks fine. >>> >>> I'm running the job again with Java FlightRecorder now, and I hope I'll >>> find the reason for the memory leak. >>> >>> Thanks! >>> >>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman < >>> khachatryan.ro...@gmail.com> wrote: >>> >>>> Thanks, Ori >>>> >>>> From the log, it looks like there IS a memory leak. >>>> >>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in >>>> 0.4653809 secs: >>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M >>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)] >>>> >>>> Then the heap grew from 10G to 28G with GC not being able to free up >>>> enough space: >>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap: >>>> 12591.0M(28960.0M)->11247.0M(28960.0M)] >>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap: >>>> 12103.0M(28960.0M)->11655.0M(28960.0M)] >>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: >>>> 12929.0M(28960.0M)->12467.0M(28960.0M)] >>>> ... ... >>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: >>>> 28042.6M(28960.0M)->27220.6M(28960.0M)] >>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: >>>> 28494.5M(28960.0M)->28720.6M(28960.0M)] >>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap: >>>> 28944.6M(28960.0M)->28944.6M(28960.0M)] >>>> >>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and >>>> heartbeat timed out: >>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure) >>>> 28944M->26018M(28960M), 51.5256128 secs] >>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap: >>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace: >>>> 113556K->112729K(1150976K)] >>>> [Times: user=91.08 sys=0.06, real=51.53 secs] >>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort] >>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - The heartbeat of >>>> JobManager with id bc59ba6a >>>> >>>> No substantial amount memory was freed after that. >>>> >>>> If this memory usage pattern is expected, I'd suggest to: >>>> 1. increase heap size >>>> 2. play with PrintStringDeduplicationStatistics and >>>> UseStringDeduplication flags - probably string deduplication is making G1 >>>> slower then CMS >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <ori....@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I'd be happy to :) Attached is a TaskManager log which timed out. >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <tonysong...@gmail.com> >>>>> wrote: >>>>> >>>>>> Maybe you can share the log and gc-log of the problematic >>>>>> TaskManager? See if we can find any clue. >>>>>> >>>>>> Thank you~ >>>>>> >>>>>> Xintong Song >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I've found out that sometimes one of my TaskManagers experiences a >>>>>>> GC pause of 40-50 seconds and I have no idea why. >>>>>>> I profiled one of the machines using JProfiler and everything looks >>>>>>> fine. No memory leaks, memory is low. >>>>>>> However, I cannot anticipate which of the machines will get the >>>>>>> 40-50 seconds pause and I also cannot profile all of them all the time. >>>>>>> >>>>>>> Any suggestions? >>>>>>> >>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> In Flink 1.10, there's a huge change in the memory management >>>>>>>> compared to previous versions. This could be related to your >>>>>>>> observations, >>>>>>>> because with the same configurations, it is possible that there's less >>>>>>>> JVM >>>>>>>> heap space (with more off-heap memory). Please take a look at this >>>>>>>> migration guide [1]. >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html >>>>>>>> >>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the suggestions! >>>>>>>>> >>>>>>>>> > i recently tried 1.10 and see this error frequently. and i dont >>>>>>>>> have the same issue when running with 1.9.1 >>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in >>>>>>>>> the occurrences in the heartbeat timeout >>>>>>>>> >>>>>>>>> >>>>>>>>> > >>>>>>>>> >>>>>>>>> - Probably the most straightforward way is to try increasing >>>>>>>>> the timeout to see if that helps. You can leverage the >>>>>>>>> configuration option >>>>>>>>> `heartbeat.timeout`[1]. The default is 50s. >>>>>>>>> - It might be helpful to share your configuration setups >>>>>>>>> (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the >>>>>>>>> easiest >>>>>>>>> way is to share the beginning part of your JM/TM logs, including >>>>>>>>> the JVM >>>>>>>>> parameters and all the loaded configurations. >>>>>>>>> - You may want to look into the GC logs in addition to the >>>>>>>>> metrics. In case of a CMS GC stop-the-world, you may not be able >>>>>>>>> to see the >>>>>>>>> most recent metrics due to the process not responding to the metric >>>>>>>>> querying services. >>>>>>>>> - You may also look into the status of the JM process. If JM >>>>>>>>> is under significant GC pressure, it could also happen that the >>>>>>>>> heartbeat >>>>>>>>> message from TM is not timely handled before the timeout check. >>>>>>>>> - Is there any metrics monitoring the network condition >>>>>>>>> between the JM and timeouted TM? Possibly any jitters? >>>>>>>>> >>>>>>>>> >>>>>>>>> Weirdly enough, I did manage to find a problem with the timed out >>>>>>>>> TaskManagers, which slipped away the last time I checked: The timed >>>>>>>>> out >>>>>>>>> TaskManager is always the one with the max. GC time (young >>>>>>>>> generation). I >>>>>>>>> see it only now that I run with G1GC, but with the previous GC it >>>>>>>>> wasn't >>>>>>>>> the case. >>>>>>>>> >>>>>>>>> Does anyone know what can cause high GC time and how to mitigate >>>>>>>>> this? >>>>>>>>> >>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song < >>>>>>>>> tonysong...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Ori, >>>>>>>>>> >>>>>>>>>> Here are some suggestions from my side. >>>>>>>>>> >>>>>>>>>> - Probably the most straightforward way is to try increasing >>>>>>>>>> the timeout to see if that helps. You can leverage the >>>>>>>>>> configuration option >>>>>>>>>> `heartbeat.timeout`[1]. The default is 50s. >>>>>>>>>> - It might be helpful to share your configuration setups >>>>>>>>>> (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe >>>>>>>>>> the easiest >>>>>>>>>> way is to share the beginning part of your JM/TM logs, including >>>>>>>>>> the JVM >>>>>>>>>> parameters and all the loaded configurations. >>>>>>>>>> - You may want to look into the GC logs in addition to the >>>>>>>>>> metrics. In case of a CMS GC stop-the-world, you may not be able >>>>>>>>>> to see the >>>>>>>>>> most recent metrics due to the process not responding to the >>>>>>>>>> metric >>>>>>>>>> querying services. >>>>>>>>>> - You may also look into the status of the JM process. If JM >>>>>>>>>> is under significant GC pressure, it could also happen that the >>>>>>>>>> heartbeat >>>>>>>>>> message from TM is not timely handled before the timeout check. >>>>>>>>>> - Is there any metrics monitoring the network condition >>>>>>>>>> between the JM and timeouted TM? Possibly any jitters? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you~ >>>>>>>>>> >>>>>>>>>> Xintong Song >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout >>>>>>>>>> >>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 >>>>>>>>>>> partitions and I have parallelism of 189. >>>>>>>>>>> >>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My >>>>>>>>>>> state size is appx. 500gb. >>>>>>>>>>> >>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors >>>>>>>>>>> with no apparent reason. >>>>>>>>>>> >>>>>>>>>>> I check the container that gets the timeout for GC pauses, heap >>>>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, >>>>>>>>>>> network >>>>>>>>>>> load, total out-records, total in-records, backpressure, and >>>>>>>>>>> everything I >>>>>>>>>>> can think of. But all those metrics show that there's nothing >>>>>>>>>>> unusual, and >>>>>>>>>>> it has around average values for all those metrics. There are a lot >>>>>>>>>>> of >>>>>>>>>>> other containers which score higher. >>>>>>>>>>> >>>>>>>>>>> All the metrics are very low because every TaskManager runs on a >>>>>>>>>>> r5.2xlarge machine alone. >>>>>>>>>>> >>>>>>>>>>> I'm trying to debug this for days and I cannot find any >>>>>>>>>>> explanation for it. >>>>>>>>>>> >>>>>>>>>>> Can someone explain why it's happening? >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager >>>>>>>>>>> with id container_1593074931633_0011_01_000127 timed out. >>>>>>>>>>> at org.apache.flink.runtime.jobmaster. >>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout( >>>>>>>>>>> JobMaster.java:1147) >>>>>>>>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl >>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109) >>>>>>>>>>> at java.util.concurrent.Executors$RunnableAdapter.call( >>>>>>>>>>> Executors.java:511) >>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397) >>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190) >>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152) >>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements >>>>>>>>>>> .scala:26) >>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements >>>>>>>>>>> .scala:21) >>>>>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction >>>>>>>>>>> .scala:123) >>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements >>>>>>>>>>> .scala:21) >>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>>>>> .scala:170) >>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>>>>> .scala:171) >>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>>>>> .scala:171) >>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>>>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor >>>>>>>>>>> .scala:225) >>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask >>>>>>>>>>> .java:260) >>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask( >>>>>>>>>>> ForkJoinPool.java:1339) >>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker( >>>>>>>>>>> ForkJoinPool.java:1979) >>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( >>>>>>>>>>> ForkJoinWorkerThread.java:107) >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>