I wouldn't want to jump into conclusions, but from what I see, very large lists and vectors do not work well with flatten in 2.11, each for its own reasons.
In any case, it's 100% not a Flink issue. On Tue, Jul 7, 2020 at 10:10 AM Xintong Song <tonysong...@gmail.com> wrote: > Thanks for the updates, Ori. > > I'm not familiar with Scala. Just curious, if what you suspect is true, is > it a bug of Scala? > > Thank you~ > > Xintong Song > > > > On Tue, Jul 7, 2020 at 1:41 PM Ori Popowski <ori....@gmail.com> wrote: > >> Hi, >> >> I just wanted to update that the problem is now solved! >> >> I suspect that Scala's flatten() method has a memory problem on very >> large lists (> 2 billion elements). When using Scala Lists, the memory >> seems to leak but the app keeps running, and when using Scala Vectors, a >> weird IllegalArgumentException is thrown [1]. >> >> I implemented my own flatten() method using Arrays and quickly ran into >> NegativeArraySizeException since the integer representing the array size >> wrapped around at Integer.MaxValue and became negative. After I started >> catching this exception all my cluster problems just resolved. Checkpoints, >> the heartbeat timeout, and also the memory and CPU utilization. >> >> I still need to confirm my suspicion towards Scala's flatten() though, >> since I haven't "lab-tested" it. >> >> [1] https://github.com/NetLogo/NetLogo/issues/1830 >> >> On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <ori....@gmail.com> wrote: >> >>> Hi, >>> >>> I initially thought this, so this is why my heap is almost 30GiB. >>> However, I started to analyze the Java Flight Recorder files, and I >>> suspect there's a memory leak in Scala's flatten() method. >>> I changed the line that uses flatten(), and instead of flatten() I'm >>> just creating a ByteArray the size flatten() would have returned, and I >>> no longer have the heartbeat problem. >>> >>> So now my code is >>> val recordingData = recordingBytes.flatten >>> >>> instead of >>> val recordingData = >>> Array.fill[Byte](recordingBytes.map(_.length).sum)(0) >>> >>> I attach a screenshot of Java Mission Control >>> >>> >>> >>> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <tonysong...@gmail.com> >>> wrote: >>> >>>> I agree with Roman's suggestion for increasing heap size. >>>> >>>> It seems that the heap grows faster than freed. Thus eventually the >>>> Full GC is triggered, taking more than 50s and causing the timeout. >>>> However, even the full GC frees only 2GB space out of the 28GB max size. >>>> That probably suggests that the max heap size is not sufficient. >>>> >>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure) >>>>> 28944M->26018M(28960M), 51.5256128 secs] >>>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap: >>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace: >>>>> 113556K->112729K(1150976K)] >>>>> [Times: user=91.08 sys=0.06, real=51.53 secs] >>>> >>>> >>>> I would not be so sure about the memory leak. I think it could be a >>>> normal pattern that memory keeps growing as more data is processed. E.g., >>>> from the provided log, I see window operation tasks executed in the task >>>> manager. Such operation might accumulate data until the window is emitted. >>>> >>>> Maybe Ori you can also take a look at the task manager log when the job >>>> runs with Flink 1.9 without this problem, see how the heap size changed. As >>>> I mentioned before, it is possible that, with the same configurations Flink >>>> 1.10 has less heap size compared to Flink 1.9, due to the memory model >>>> changes. >>>> >>>> Thank you~ >>>> >>>> Xintong Song >>>> >>>> >>>> >>>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <ori....@gmail.com> wrote: >>>> >>>>> Thank you very much for your analysis. >>>>> >>>>> When I said there was no memory leak - I meant that from the specific >>>>> TaskManager I monitored in real-time using JProfiler. >>>>> Unfortunately, this problem occurs only in 1 of the TaskManager and >>>>> you cannot anticipate which. So when you pick a TM to profile at random - >>>>> everything looks fine. >>>>> >>>>> I'm running the job again with Java FlightRecorder now, and I hope >>>>> I'll find the reason for the memory leak. >>>>> >>>>> Thanks! >>>>> >>>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman < >>>>> khachatryan.ro...@gmail.com> wrote: >>>>> >>>>>> Thanks, Ori >>>>>> >>>>>> From the log, it looks like there IS a memory leak. >>>>>> >>>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in >>>>>> 0.4653809 secs: >>>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M >>>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)] >>>>>> >>>>>> Then the heap grew from 10G to 28G with GC not being able to free up >>>>>> enough space: >>>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M >>>>>> Heap: 12591.0M(28960.0M)->11247.0M(28960.0M)] >>>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap: >>>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)] >>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M >>>>>> Heap: 12929.0M(28960.0M)->12467.0M(28960.0M)] >>>>>> ... ... >>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M >>>>>> Heap: 28042.6M(28960.0M)->27220.6M(28960.0M)] >>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M >>>>>> Heap: 28494.5M(28960.0M)->28720.6M(28960.0M)] >>>>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap: >>>>>> 28944.6M(28960.0M)->28944.6M(28960.0M)] >>>>>> >>>>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and >>>>>> heartbeat timed out: >>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure) >>>>>> 28944M->26018M(28960M), 51.5256128 secs] >>>>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap: >>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace: >>>>>> 113556K->112729K(1150976K)] >>>>>> [Times: user=91.08 sys=0.06, real=51.53 secs] >>>>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort] >>>>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - The heartbeat of >>>>>> JobManager with id bc59ba6a >>>>>> >>>>>> No substantial amount memory was freed after that. >>>>>> >>>>>> If this memory usage pattern is expected, I'd suggest to: >>>>>> 1. increase heap size >>>>>> 2. play with PrintStringDeduplicationStatistics and >>>>>> UseStringDeduplication flags - probably string deduplication is making G1 >>>>>> slower then CMS >>>>>> >>>>>> Regards, >>>>>> Roman >>>>>> >>>>>> >>>>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <ori....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I'd be happy to :) Attached is a TaskManager log which timed out. >>>>>>> >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Maybe you can share the log and gc-log of the problematic >>>>>>>> TaskManager? See if we can find any clue. >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I've found out that sometimes one of my TaskManagers experiences a >>>>>>>>> GC pause of 40-50 seconds and I have no idea why. >>>>>>>>> I profiled one of the machines using JProfiler and everything >>>>>>>>> looks fine. No memory leaks, memory is low. >>>>>>>>> However, I cannot anticipate which of the machines will get the >>>>>>>>> 40-50 seconds pause and I also cannot profile all of them all the >>>>>>>>> time. >>>>>>>>> >>>>>>>>> Any suggestions? >>>>>>>>> >>>>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song < >>>>>>>>> tonysong...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> In Flink 1.10, there's a huge change in the memory management >>>>>>>>>> compared to previous versions. This could be related to your >>>>>>>>>> observations, >>>>>>>>>> because with the same configurations, it is possible that there's >>>>>>>>>> less JVM >>>>>>>>>> heap space (with more off-heap memory). Please take a look at this >>>>>>>>>> migration guide [1]. >>>>>>>>>> >>>>>>>>>> Thank you~ >>>>>>>>>> >>>>>>>>>> Xintong Song >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html >>>>>>>>>> >>>>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the suggestions! >>>>>>>>>>> >>>>>>>>>>> > i recently tried 1.10 and see this error frequently. and i >>>>>>>>>>> dont have the same issue when running with 1.9.1 >>>>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in >>>>>>>>>>> the occurrences in the heartbeat timeout >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>> - Probably the most straightforward way is to try increasing >>>>>>>>>>> the timeout to see if that helps. You can leverage the >>>>>>>>>>> configuration option >>>>>>>>>>> `heartbeat.timeout`[1]. The default is 50s. >>>>>>>>>>> - It might be helpful to share your configuration setups >>>>>>>>>>> (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe >>>>>>>>>>> the easiest >>>>>>>>>>> way is to share the beginning part of your JM/TM logs, including >>>>>>>>>>> the JVM >>>>>>>>>>> parameters and all the loaded configurations. >>>>>>>>>>> - You may want to look into the GC logs in addition to the >>>>>>>>>>> metrics. In case of a CMS GC stop-the-world, you may not be able >>>>>>>>>>> to see the >>>>>>>>>>> most recent metrics due to the process not responding to the >>>>>>>>>>> metric >>>>>>>>>>> querying services. >>>>>>>>>>> - You may also look into the status of the JM process. If JM >>>>>>>>>>> is under significant GC pressure, it could also happen that the >>>>>>>>>>> heartbeat >>>>>>>>>>> message from TM is not timely handled before the timeout check. >>>>>>>>>>> - Is there any metrics monitoring the network condition >>>>>>>>>>> between the JM and timeouted TM? Possibly any jitters? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Weirdly enough, I did manage to find a problem with the timed >>>>>>>>>>> out TaskManagers, which slipped away the last time I checked: The >>>>>>>>>>> timed out >>>>>>>>>>> TaskManager is always the one with the max. GC time (young >>>>>>>>>>> generation). I >>>>>>>>>>> see it only now that I run with G1GC, but with the previous GC it >>>>>>>>>>> wasn't >>>>>>>>>>> the case. >>>>>>>>>>> >>>>>>>>>>> Does anyone know what can cause high GC time and how to mitigate >>>>>>>>>>> this? >>>>>>>>>>> >>>>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song < >>>>>>>>>>> tonysong...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Ori, >>>>>>>>>>>> >>>>>>>>>>>> Here are some suggestions from my side. >>>>>>>>>>>> >>>>>>>>>>>> - Probably the most straightforward way is to try >>>>>>>>>>>> increasing the timeout to see if that helps. You can leverage >>>>>>>>>>>> the >>>>>>>>>>>> configuration option `heartbeat.timeout`[1]. The default is 50s. >>>>>>>>>>>> - It might be helpful to share your configuration setups >>>>>>>>>>>> (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe >>>>>>>>>>>> the easiest >>>>>>>>>>>> way is to share the beginning part of your JM/TM logs, >>>>>>>>>>>> including the JVM >>>>>>>>>>>> parameters and all the loaded configurations. >>>>>>>>>>>> - You may want to look into the GC logs in addition to the >>>>>>>>>>>> metrics. In case of a CMS GC stop-the-world, you may not be >>>>>>>>>>>> able to see the >>>>>>>>>>>> most recent metrics due to the process not responding to the >>>>>>>>>>>> metric >>>>>>>>>>>> querying services. >>>>>>>>>>>> - You may also look into the status of the JM process. If >>>>>>>>>>>> JM is under significant GC pressure, it could also happen that >>>>>>>>>>>> the >>>>>>>>>>>> heartbeat message from TM is not timely handled before the >>>>>>>>>>>> timeout check. >>>>>>>>>>>> - Is there any metrics monitoring the network condition >>>>>>>>>>>> between the JM and timeouted TM? Possibly any jitters? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thank you~ >>>>>>>>>>>> >>>>>>>>>>>> Xintong Song >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski < >>>>>>>>>>>> ori....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 >>>>>>>>>>>>> partitions and I have parallelism of 189. >>>>>>>>>>>>> >>>>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. >>>>>>>>>>>>> My state size is appx. 500gb. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" >>>>>>>>>>>>> errors with no apparent reason. >>>>>>>>>>>>> >>>>>>>>>>>>> I check the container that gets the timeout for GC pauses, >>>>>>>>>>>>> heap memory, direct memory, mapped memory, offheap memory, CPU >>>>>>>>>>>>> load, >>>>>>>>>>>>> network load, total out-records, total in-records, backpressure, >>>>>>>>>>>>> and >>>>>>>>>>>>> everything I can think of. But all those metrics show that >>>>>>>>>>>>> there's nothing >>>>>>>>>>>>> unusual, and it has around average values for all those metrics. >>>>>>>>>>>>> There are >>>>>>>>>>>>> a lot of other containers which score higher. >>>>>>>>>>>>> >>>>>>>>>>>>> All the metrics are very low because every TaskManager runs on >>>>>>>>>>>>> a r5.2xlarge machine alone. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm trying to debug this for days and I cannot find any >>>>>>>>>>>>> explanation for it. >>>>>>>>>>>>> >>>>>>>>>>>>> Can someone explain why it's happening? >>>>>>>>>>>>> >>>>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of >>>>>>>>>>>>> TaskManager with id container_1593074931633_0011_01_000127 >>>>>>>>>>>>> timed out. >>>>>>>>>>>>> at org.apache.flink.runtime.jobmaster. >>>>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout( >>>>>>>>>>>>> JobMaster.java:1147) >>>>>>>>>>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl >>>>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109) >>>>>>>>>>>>> at java.util.concurrent.Executors$RunnableAdapter.call( >>>>>>>>>>>>> Executors.java:511) >>>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266 >>>>>>>>>>>>> ) >>>>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397) >>>>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190) >>>>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>>>>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152) >>>>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements >>>>>>>>>>>>> .scala:26) >>>>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements >>>>>>>>>>>>> .scala:21) >>>>>>>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction >>>>>>>>>>>>> .scala:123) >>>>>>>>>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse( >>>>>>>>>>>>> CaseStatements.scala:21) >>>>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse( >>>>>>>>>>>>> PartialFunction.scala:170) >>>>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse( >>>>>>>>>>>>> PartialFunction.scala:171) >>>>>>>>>>>>> at scala.PartialFunction$OrElse.applyOrElse( >>>>>>>>>>>>> PartialFunction.scala:171) >>>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>>>>>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor >>>>>>>>>>>>> .scala:225) >>>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592 >>>>>>>>>>>>> ) >>>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask >>>>>>>>>>>>> .java:260) >>>>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask( >>>>>>>>>>>>> ForkJoinPool.java:1339) >>>>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker( >>>>>>>>>>>>> ForkJoinPool.java:1979) >>>>>>>>>>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( >>>>>>>>>>>>> ForkJoinWorkerThread.java:107) >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>