Hi Josson,

Thanks for getting back.

What are the JVM settings and in particular GC settings that you are using
(G1GC?)?
It could also be an issue that in 1.4 you were just slightly below the
threshold of GC issues, while in 1.8, something is using a bit more memory,
causing the GC issues to appear? Have you tried just increasing the heap
size?
Have you tried to compare on the job start up, what is the usage and size
of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can point us in
the right direction.

> My understanding on back pressure is that it is not based on Heap memory
but based on how fast the Network buffers are filled. Is this correct?.
> Does Flink use TCP connection to communicate between tasks if the tasks
are in the same Task manager?.

No, local input channels are being used then, but memory for network
buffers is assigned to tasks regardless of the fraction of local input
channels in the task. However with just single taskmanager and parallelism
of 4, the amount of the memory used by the network stack should be
insignificant, at least as long as you have a reasonably sized job graph
(32KB * (2 * parallelism + 7) * number of tasks).

> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
there is not sufficient heap memory to process data. Somehow this is not
happening in Flink 1.8 and it fills the heap soon enough not to get
GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.

No, there were no changes in this part as far as I remember. Tasks when
producing records are serialising them and putting into the network
buffers. If there are no available network buffers, the task is back
pressuring and stops processing new records.

Best regards,
Piotrek


wt., 8 wrz 2020 o 21:51 Josson Paul <jossonp...@gmail.com> napisał(a):

> Hi Piotr,
>    2) SystemProcessingTimeService holds the HeapKeyedStateBackend and
> HeapKeyedStateBackend has lot of Objects and that is filling the Heap
>    3) I am not using Flink Kafka Connector. But we are using Apache Beam
> kafka connector.  There is a change in the Apache Beam version. But the
> kafka client we are using is the same as the one which was working in the
> other cluster where  Flink was 1.4.
>   *There is no change in Hardware/Java/Kafka/Kafka Client/Application
> between the cluster which is working and not working*
>
> I am aware of the memory changes and network buffer changes between 1.4
> and 1.8.
>
> Flink 1.4 had network buffers on Heap and 1.8 network buffers are on the
> native memory. I modified the Flink 1.8 code to put it back to Heap memory
> but the issue didn't get resolved.
>
> Mine is a streaming job so we set 'taskmanager.memory.fraction' to very
> minimal and that heap is fully available for user data.
>
> Flink 1.4 was not using Credit based Flow control and Flink 1.8 uses
> Credit based Flow control. *Our set up has only 1 task manager and 4
> parallelisms*.  According to this video
> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
> *16:21*) if tasks are in same task manager,  Flink doesn't use Credit
> Based Flow control. Essentially no change between Flink 1.4 and 1.8 in *our
> set up*. Still I tried to change the Credit Based Flow Control to False
> and test my setup. The problem persists.
>
> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
> there is not sufficient heap memory to process data. Somehow this is not
> happening in Flink 1.8 and it fills the heap soon enough not to get
> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.
>
> My understanding on back pressure is that it is not based on Heap memory
> but based on how fast the Network buffers are filled. Is this correct?.
> Does Flink use TCP connection to communicate between tasks if the tasks
> are in the same Task manager?.
>
> Thanks,
> josson
>
> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi Josson,
>>
>> 2. Are you sure that all/vast majority of those objects are pointing
>> towards SystemProcessingTimeService? And is this really the problem of
>> those objects? Are they taking that much of the memory?
>> 3. It still could be Kafka's problem, as it's likely that between 1.4 and
>> 1.8.x we bumped Kafka dependencies.
>>
>> Frankly if that's not some other external dependency issue, I would
>> expect that the problem might lie somewhere completely else. Flink's code
>> relaying on the finalisation hasn't changed since 2015/2016. On the other
>> hand there were quite a bit of changes between 1.4 and 1.8.x, some of them
>> were affecting memory usage. Have you read release notes for versions 1.5,
>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have memory
>> related notes that could be addressed via configuration changes.
>>
>> Thanks,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-notes/flink-1.8.html
>>
>> czw., 3 wrz 2020 o 18:50 Josson Paul <jossonp...@gmail.com> napisał(a):
>>
>>> 1) We are in the process of migrating to Flink 1.11. But it is going to
>>> take a while before we can make everything work with the latest version.
>>> Meanwhile since this is happening in production I am trying to solve this.
>>> 2) Finalizae class is pointing
>>> to org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService .
>>> This class has a finalize method. I have attached spreadsheet (
>>> *Object-explorer.csv*) to give you a high level view
>>> 3) The difference between working cluster and NON working cluster is
>>> only on Beam and Flink. Hardware, Input message rate, Application jars,
>>> Kafka are all the same between those 2 clusters. Working cluster was with
>>> Flink 1.4 and Beam 2.4.0
>>>
>>> Any insights into this will help me to debug further
>>>
>>> Thanks,
>>> Josson
>>>
>>>
>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Have you tried using a more recent Flink version? 1.8.x is no longer
>>>> supported, and latest versions might not have this issue anymore.
>>>>
>>>> Secondly, have you tried backtracking those references to the
>>>> Finalizers? Assuming that Finalizer is indeed the class causing problems.
>>>>
>>>> Also it may well be a non Flink issue [1].
>>>>
>>>> Best regards,
>>>> Piotrek
>>>>
>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>
>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jossonp...@gmail.com> napisał(a):
>>>>
>>>>> Hi All,
>>>>>
>>>>> *ISSUE*
>>>>> ------
>>>>> Flink application runs for sometime and suddenly the CPU shoots up and
>>>>> touches the peak, POD memory reaches to the peak, GC count increases,
>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean up heap space. 
>>>>> At
>>>>> this point I stopped sending the data and cancelled the Flink Jobs. Still
>>>>> the Old-Gen space doesn't come down. I took a heap dump and can see that
>>>>> lot of Objects in the java.lang.Finalizer class. I have attached the
>>>>> details in a word document. I do have the heap dump but it is close to 2GB
>>>>> of compressed size. Is it safe to upload somewhere and share it here?.
>>>>>
>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: release-2.4.0
>>>>>
>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: release-2.4.0)
>>>>> ----------------------------------------------------
>>>>>
>>>>> Application reads from Kafka and does aggregations and writes into
>>>>> Kafka. Application has 5 minutes windows. Application uses Beam constructs
>>>>> to build the pipeline. To read and write we use Beam connectors.
>>>>>
>>>>> Flink version: 1.4.0
>>>>> Beam version: release-2.4.0
>>>>> Backend State: State backend is in the Heap and check pointing
>>>>> happening to the distributed File System.
>>>>>
>>>>> No of task Managers: 1
>>>>> Heap: 6.4 GB
>>>>> CPU: 4 Cores
>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>>
>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam version:
>>>>> release-2.15.0)
>>>>> ----------
>>>>> Application details are same as above
>>>>>
>>>>> *No change in application and the rate at which data is injected. But
>>>>> change in Flink and Beam versions*
>>>>>
>>>>>
>>>>> Flink version: 1.8.3
>>>>> Beam version: release-2.15.0
>>>>> Backend State: State backend is in the Heap and check pointing
>>>>> happening to the distributed File System.
>>>>>
>>>>> No of task Managers: 1
>>>>> Heap: 6.5 GB
>>>>> CPU: 4 Cores
>>>>>
>>>>> Deployment: Standalone cluster deployment on a Kubernetes pod
>>>>>
>>>>> My Observations
>>>>> -------------
>>>>>
>>>>> 1) CPU flame graph shows that in the working version, the cpu time on
>>>>> GC is lesser compared to non-working version (Please see the attached 
>>>>> Flame
>>>>> Graph. *CPU-flame-WORKING.svg* for working cluster and
>>>>> *CPU-flame-NOT-working.svg*)
>>>>>
>>>>> 2) I have attached the flame graph for native memory MALLOC calls when
>>>>> the issue was happening. Please find the attached SVG image (
>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this issue
>>>>> happens. For me, it looks like the GC process is requesting a lot of 
>>>>> native
>>>>> memory.
>>>>>
>>>>> 3) When the issue is happening the GC cpu usage is very high. Please
>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>>
>>>>> Note: SVG file can be opened using any browser and it is clickable
>>>>> while opened.
>>>>> --
>>>>> Thanks
>>>>> Josson
>>>>>
>>>>
>>>
>>> --
>>> Thanks
>>> Josson
>>>
>>
>
> --
> Thanks
> Josson
>

Reply via email to