What looks a bit strange to me is that with a running job, the 
SystemProcessingTimeService should actually not be collected (since it is 
still in use)!

My guess is that something is indeed happening during that time frame (maybe 
job restarts?) and I would propose to check your logs for anything suspicious 
in there.


When I did experiments with Beam pipelines on our platform [1], I also 
noticed, that the standard fat jars that Beam creates include Flink runtime 
classes it shouldn't (at least if you are submitting to a separate Flink 
cluster). This can cause all sorts of problems and I would recommend removing 
those from the fat jar as documented in [1].




Nico



[1] https://ververica.zendesk.com/hc/en-us/articles/360014323099

On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
> Hi Josson,
> 
> Thanks again for the detailed answer, and sorry that I can not help you
> with some immediate answer. I presume that jvm args for 1.8 are the same?
> 
> Can you maybe post what exactly has crashed in your cases a) and b)?
> Re c), in the previously attached word document, it looks like Flink was
> running without problems for a couple of hours/minutes, everything was
> stable, no signs of growing memory consumption, impending problem, until
> around 23:15, when the problem started, right? Has something else happened
> at that time, something that could explain the spike? A checkpoint? Job
> crash/restart? Load spike?
> 
> A couple of other random guesses:
> - have you been monitoring other memory pools for Flink 1.4 and 1.8? Like
> meta space? Growing meta space size can sometimes cause problems. It
> shouldn't be the case here, as you configured XX:MaxMetaspaceSize, but it
> might be still worth checking...
> - another random idea, have you tried upgrading JDK? Maybe that would solve
> the problem?
> 
> Best regards,
> Piotrek
> 
> śr., 9 wrz 2020 o 19:53 Josson Paul <jossonp...@gmail.com> napisał(a):
> > Hi Piotr,
> > 
> >  *JVM start up for Flink 1.4*
> > 
> > *-------------------------------*
> > 
> > java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
> > skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
> > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
> > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
> > *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
> > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
> > %p*-Dio.netty.eventLoopThreads=3*
> > -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
> > g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
> > orkaddress.cache.ttl=120-Dnum.cores=3-
> > *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
> > allelism=3-XX:ConcGCThreads=4 *
> > -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
> > r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
> > r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
> > resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
> > pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
> > aglev.MaglevServerstartmaglev> 
> >    1.   taskmanager.memory.fraction = 0.7f (This was coming to 4.5 GB. I
> >    didn't know at that time that we could set memory fraction to zero
> >    because
> >    ours is a streaming job. It was  picking up the default )
> >    2.    Network buffer pool memory was 646MB on the Heap (I think this
> >    was the default based on some calculations in the Flink 1.4)
> >    3.    G1GC region size was 4MB (Default)
> > 
> > I tested this setup by reducing the JVM heap by *1GB.* It still worked
> > perfectly with some lags here and there.
> > 
> > *JVM start up for Flink 1.8*
> > *------------------------------------*
> > a) I started with the same configuration as above. Kubenetis POD went out
> > of memory. At this point I realized that in Flink 1.8  network buffer
> > pools
> > are moved to native memory. Based on calculations it was coming to 200MB
> > in
> > native  memory. I increased the overall POD memory to accommodate the
> > buffer pool change keeping the *heap the same*.
> > 
> > b) Even after I modified the overall POD memory,  the POD still crashed.
> > At this point I generated Flame graphs to identify the CPU/Malloc calls
> > (Attached as part of the initial email). Realized that cpu usage of G1GC
> > is
> > significantly different from Flink 1.4. Now I made 2 changes
> > 
> >    1.  taskmanager.memory.fraction = 0.01f (This will give more heap for
> >    user code)
> >    2. Increased cpu from 3 to 4 cores.
> >    
> >         Above changes helped to hold the cluster a little longer. But it
> > 
> > still crashed after sometime.
> > 
> > c)  Now I made the below changes.
> > 
> >    1. I came across this ->
> >    http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
> >    622.html . Now I changed the G1GC region space to *8MB *instead of the
> >    default 4MB*.*
> >    2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later experiments)
> >    3. Played around with G1RSetSparseRegionEntries
> >    
> >        This helped to avoid the POD going out of memory. But the Old Gen
> > 
> > heap issue was very evident now (Please see the attached word document).
> > 
> >  d)  Allocated additional heap memory of *700 MB *along with the above
> > 
> > changes. This also didn't help. It just prolonged the crash.  Now I need
> > help from others to which direction I want to take this to .
> > 
> > My worry is even if I upgrade to flink 1.11 this issue might still
> > persist.
> > 
> > I have attached a screenshot from Heap dump to show you the difference
> > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is created. Not
> > sure whether this change has something to do with this memory issue that I
> > am facing.
> > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
> > 
> > 
> > Thanks,
> > Josson
> > 
> > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <pnowoj...@apache.org>
> > 
> > wrote:
> >> Hi Josson,
> >> 
> >> Thanks for getting back.
> >> 
> >> What are the JVM settings and in particular GC settings that you are
> >> using (G1GC?)?
> >> It could also be an issue that in 1.4 you were just slightly below the
> >> threshold of GC issues, while in 1.8, something is using a bit more
> >> memory,
> >> causing the GC issues to appear? Have you tried just increasing the heap
> >> size?
> >> Have you tried to compare on the job start up, what is the usage and size
> >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can point us in
> >> the right direction.
> >> 
> >> > My understanding on back pressure is that it is not based on Heap
> >> 
> >> memory but based on how fast the Network buffers are filled. Is this
> >> correct?.
> >> 
> >> > Does Flink use TCP connection to communicate between tasks if the tasks
> >> 
> >> are in the same Task manager?.
> >> 
> >> No, local input channels are being used then, but memory for network
> >> buffers is assigned to tasks regardless of the fraction of local input
> >> channels in the task. However with just single taskmanager and
> >> parallelism
> >> of 4, the amount of the memory used by the network stack should be
> >> insignificant, at least as long as you have a reasonably sized job graph
> >> (32KB * (2 * parallelism + 7) * number of tasks).
> >> 
> >> > What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
> >> 
> >> there is not sufficient heap memory to process data. Somehow this is not
> >> happening in Flink 1.8 and it fills the heap soon enough not to get
> >> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.
> >> 
> >> No, there were no changes in this part as far as I remember. Tasks when
> >> producing records are serialising them and putting into the network
> >> buffers. If there are no available network buffers, the task is back
> >> pressuring and stops processing new records.
> >> 
> >> Best regards,
> >> Piotrek
> >> 
> >> wt., 8 wrz 2020 o 21:51 Josson Paul <jossonp...@gmail.com> napisał(a):
> >>> Hi Piotr,
> >>> 
> >>>    2) SystemProcessingTimeService holds the HeapKeyedStateBackend and
> >>> 
> >>> HeapKeyedStateBackend has lot of Objects and that is filling the Heap
> >>> 
> >>>    3) I am not using Flink Kafka Connector. But we are using Apache Beam
> >>> 
> >>> kafka connector.  There is a change in the Apache Beam version. But the
> >>> kafka client we are using is the same as the one which was working in
> >>> the
> >>> other cluster where  Flink was 1.4.
> >>> 
> >>>   *There is no change in Hardware/Java/Kafka/Kafka Client/Application
> >>> 
> >>> between the cluster which is working and not working*
> >>> 
> >>> I am aware of the memory changes and network buffer changes between 1.4
> >>> and 1.8.
> >>> 
> >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers are on the
> >>> native memory. I modified the Flink 1.8 code to put it back to Heap
> >>> memory
> >>> but the issue didn't get resolved.
> >>> 
> >>> Mine is a streaming job so we set 'taskmanager.memory.fraction' to very
> >>> minimal and that heap is fully available for user data.
> >>> 
> >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8 uses
> >>> Credit based Flow control. *Our set up has only 1 task manager and 4
> >>> parallelisms*.  According to this video
> >>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
> >>> *16:21*) if tasks are in same task manager,  Flink doesn't use Credit
> >>> Based Flow control. Essentially no change between Flink 1.4 and 1.8 in
> >>> *our
> >>> set up*. Still I tried to change the Credit Based Flow Control to False
> >>> and test my setup. The problem persists.
> >>> 
> >>> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
> >>> there is not sufficient heap memory to process data. Somehow this is not
> >>> happening in Flink 1.8 and it fills the heap soon enough not to get
> >>> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.
> >>> 
> >>> My understanding on back pressure is that it is not based on Heap memory
> >>> but based on how fast the Network buffers are filled. Is this correct?.
> >>> Does Flink use TCP connection to communicate between tasks if the tasks
> >>> are in the same Task manager?.
> >>> 
> >>> Thanks,
> >>> josson
> >>> 
> >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <pnowoj...@apache.org>
> >>> 
> >>> wrote:
> >>>> Hi Josson,
> >>>> 
> >>>> 2. Are you sure that all/vast majority of those objects are pointing
> >>>> towards SystemProcessingTimeService? And is this really the problem of
> >>>> those objects? Are they taking that much of the memory?
> >>>> 3. It still could be Kafka's problem, as it's likely that between 1.4
> >>>> and 1.8.x we bumped Kafka dependencies.
> >>>> 
> >>>> Frankly if that's not some other external dependency issue, I would
> >>>> expect that the problem might lie somewhere completely else. Flink's
> >>>> code
> >>>> relaying on the finalisation hasn't changed since 2015/2016. On the
> >>>> other
> >>>> hand there were quite a bit of changes between 1.4 and 1.8.x, some of
> >>>> them
> >>>> were affecting memory usage. Have you read release notes for versions
> >>>> 1.5,
> >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have memory
> >>>> related notes that could be addressed via configuration changes.
> >>>> 
> >>>> Thanks,
> >>>> Piotrek
> >>>> 
> >>>> [1]
> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
> >>>> es/flink-1.5.html [2]
> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
> >>>> es/flink-1.8.html>>>> 
> >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jossonp...@gmail.com> napisał(a):
> >>>>> 1) We are in the process of migrating to Flink 1.11. But it is going
> >>>>> to take a while before we can make everything work with the latest
> >>>>> version.
> >>>>> Meanwhile since this is happening in production I am trying to solve
> >>>>> this.
> >>>>> 2) Finalizae class is pointing
> >>>>> to
> >>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
> >>>>> .
> >>>>> This class has a finalize method. I have attached spreadsheet (
> >>>>> *Object-explorer.csv*) to give you a high level view
> >>>>> 3) The difference between working cluster and NON working cluster is
> >>>>> only on Beam and Flink. Hardware, Input message rate, Application
> >>>>> jars,
> >>>>> Kafka are all the same between those 2 clusters. Working cluster was
> >>>>> with
> >>>>> Flink 1.4 and Beam 2.4.0
> >>>>> 
> >>>>> Any insights into this will help me to debug further
> >>>>> 
> >>>>> Thanks,
> >>>>> Josson
> >>>>> 
> >>>>> 
> >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <pnowoj...@apache.org>
> >>>>> 
> >>>>> wrote:
> >>>>>> Hi,
> >>>>>> 
> >>>>>> Have you tried using a more recent Flink version? 1.8.x is no longer
> >>>>>> supported, and latest versions might not have this issue anymore.
> >>>>>> 
> >>>>>> Secondly, have you tried backtracking those references to the
> >>>>>> Finalizers? Assuming that Finalizer is indeed the class causing
> >>>>>> problems.
> >>>>>> 
> >>>>>> Also it may well be a non Flink issue [1].
> >>>>>> 
> >>>>>> Best regards,
> >>>>>> Piotrek
> >>>>>> 
> >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
> >>>>>> 
> >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jossonp...@gmail.com>
> >>>>>> 
> >>>>>> napisał(a):
> >>>>>>> Hi All,
> >>>>>>> 
> >>>>>>> *ISSUE*
> >>>>>>> ------
> >>>>>>> Flink application runs for sometime and suddenly the CPU shoots up
> >>>>>>> and touches the peak, POD memory reaches to the peak, GC count
> >>>>>>> increases,
> >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean up heap
> >>>>>>> space. At
> >>>>>>> this point I stopped sending the data and cancelled the Flink Jobs.
> >>>>>>> Still
> >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and can see
> >>>>>>> that
> >>>>>>> lot of Objects in the java.lang.Finalizer class. I have attached the
> >>>>>>> details in a word document. I do have the heap dump but it is close
> >>>>>>> to 2GB
> >>>>>>> of compressed size. Is it safe to upload somewhere and share it
> >>>>>>> here?.
> >>>>>>> 
> >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: release-2.4.0
> >>>>>>> 
> >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: release-2.4.0)
> >>>>>>> ----------------------------------------------------
> >>>>>>> 
> >>>>>>> Application reads from Kafka and does aggregations and writes into
> >>>>>>> Kafka. Application has 5 minutes windows. Application uses Beam
> >>>>>>> constructs
> >>>>>>> to build the pipeline. To read and write we use Beam connectors.
> >>>>>>> 
> >>>>>>> Flink version: 1.4.0
> >>>>>>> Beam version: release-2.4.0
> >>>>>>> Backend State: State backend is in the Heap and check pointing
> >>>>>>> happening to the distributed File System.
> >>>>>>> 
> >>>>>>> No of task Managers: 1
> >>>>>>> Heap: 6.4 GB
> >>>>>>> CPU: 4 Cores
> >>>>>>> Standalone cluster deployment on a Kubernetes pod
> >>>>>>> 
> >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam version:
> >>>>>>> release-2.15.0)
> >>>>>>> ----------
> >>>>>>> Application details are same as above
> >>>>>>> 
> >>>>>>> *No change in application and the rate at which data is injected.
> >>>>>>> But change in Flink and Beam versions*
> >>>>>>> 
> >>>>>>> 
> >>>>>>> Flink version: 1.8.3
> >>>>>>> Beam version: release-2.15.0
> >>>>>>> Backend State: State backend is in the Heap and check pointing
> >>>>>>> happening to the distributed File System.
> >>>>>>> 
> >>>>>>> No of task Managers: 1
> >>>>>>> Heap: 6.5 GB
> >>>>>>> CPU: 4 Cores
> >>>>>>> 
> >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes pod
> >>>>>>> 
> >>>>>>> My Observations
> >>>>>>> -------------
> >>>>>>> 
> >>>>>>> 1) CPU flame graph shows that in the working version, the cpu time
> >>>>>>> on GC is lesser compared to non-working version (Please see the
> >>>>>>> attached
> >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
> >>>>>>> *CPU-flame-NOT-working.svg*)
> >>>>>>> 
> >>>>>>> 2) I have attached the flame graph for native memory MALLOC calls
> >>>>>>> when the issue was happening. Please find the attached SVG image (
> >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this issue
> >>>>>>> happens. For me, it looks like the GC process is requesting a lot of
> >>>>>>> native
> >>>>>>> memory.
> >>>>>>> 
> >>>>>>> 3) When the issue is happening the GC cpu usage is very high. Please
> >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
> >>>>>>> 
> >>>>>>> Note: SVG file can be opened using any browser and it is clickable
> >>>>>>> while opened.
> >>>>>>> --
> >>>>>>> Thanks
> >>>>>>> Josson
> >>>>> 
> >>>>> --
> >>>>> Thanks
> >>>>> Josson
> >>> 
> >>> --
> >>> Thanks
> >>> Josson
> > 
> > --
> > Thanks
> > Josson




Reply via email to