Flink local mini cluster is causing memory leak when triggered multiple times

2023-08-30 Thread Chandrashekar Sankarapu via user
Hi Team,

We have a data pipeline which is built using Apache Beam SDK
 and we use Apache Flink Runner
 to execute Beam
pipelines. We use the local embedded execution mode of Flink for running
the pipelines.

Currently, we are running into an issue where in the batch pipeline can be
triggered multiple times and each time the pipeline is triggered the Flink
creates a miniCluster in local execution mode, runs the job and destroys
the miniCluster once the job is completed. When the batch job is triggered
multiple times we observe the application process Resident Set Size(RSS)
memory keeps increasing (approximately increases by a value set for
parameter 'taskmanager.memory.network.max') with each run and is not
released, eventually leading to crash of the docker container (container
memory is limited) in which this is deployed. However, when we checked the
JVM memory using tools like JConsole, jcmd etc does not show any increase
and is always with the Xmx value set.

We analysed the heap dump of the application but it didn't show any memory
leaks.

Has anyone faced this issue? Any pointers are appreciated.

Thanks,
Chandra


RE: Rate Limit / Throttle Data to Send

2023-08-30 Thread Schwalbe Matthias
Hi Patricia,

What you try to implement can be achieved out-of-the-box by windowing.

I assume these packets of 100 event are by key but globally.
In that case use non-keyed windowing [1] with count trigger (100) [3] and maybe 
add a processing time trigger if it takes too long time to collect all 100 
events, then create the output with a process window function [2].

I hope this helps

Thias


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers


From: patricia lee 
Sent: Wednesday, August 30, 2023 6:54 AM
To: user@flink.apache.org
Subject: Rate Limit / Throttle Data to Send

Hi,

I have a requirement that I need to send data to a third party with a limit 
number of elements with flow below.

kafkasource
mapToVendorPojo
processfunction
sinkToVendor

My implementation is I continuously add the elements to my list state
ListState in ProcessFunction and once it reaches 100 in size I emit 
the data and start collecting data again to another set of 100.

if (rateConfig == Iterables.size(appEventState.get()) {
List holder = new ArrayList();
appEventState.get().forEach(e -> holder.add(e));
collector.collect(holder);
appEventState.clear()
}

The problem I am getting is, "if " condition above never gets matched. Because 
the appEventState size is always 0 or 1 only. The rateConfig is set to 20.

What am I missing?

Thanks,
Patricia

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Semantics of purging with global windows

2023-08-30 Thread Alexis Sarda-Espinosa
Hello,

According to the javadoc of TriggerResult.PURGE, "All elements in the
window are cleared and the window is discarded, without evaluating
the window function or emitting any elements."
However, I've noticed that using a GlobalWindow (with a custom trigger)
followed by an AggregateFunction will call the function's add() even when
the trigger result is PURGE.

It seems to me that this has been the behavior for a very long time:

https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53#diff-6d18531a35cddca6e5995c40c7a564fd711b998d567c4e167a401f76ca29a2bbR295-R299

Is that really necessary? I'm guessing that operator deals with all types
of windows, so I'm not sure how that affects other window types.

Regards,
Alexis.


Blue green deployment with Flink Apache Operator

2023-08-30 Thread Nicolas Fraison via user
Hi,

>From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
support for blue green deployment will not be supported or will not happen
soon.

I'd like to know if some of you have built a custom mechanism on top of
this operator to support the blue green deployment and if you would have
any advice on implementing this?

-- 

Nicolas Fraison (he/him)


Access to collector in the process function

2023-08-30 Thread Oscar Perez via user
Hi!
We would like to use hexagonal architecture in our design and treat the
collector as an output port when sending events from the use case.

For that, we would like to call an interface from the use case that
effectively sends the event ultimately via out.collect

The problem is that for instantiating the use case we need to inject the
collector as dependency and we dont have access to the collector at the
process function class level, only at the processelement method level.

Is there any way to access the collector from the process function class,
in the open method ?

Regards,
Oscar


Job graph

2023-08-30 Thread Nikolaos Paraskakis
Hello folks,

I am trying to get the job graph of a running flink job. I want to use flink 
libraries. For now, I have the RestClusterClient and the job IDs. Tell me 
please how to get the job graph.

Thank you.

Re: Blue green deployment with Flink Apache Operator

2023-08-30 Thread Gyula Fóra
Hey!

I don't know if anyone has implemented this or not but one way to approach
this problem (and this may not be the right way, just an idea :) ) is to
add a new Custom Resource type that sits on top of the FlinkDeployment /
FlinkSessionJob resources and add a small controller for this.

This new custom resource, BlueGreenDeployment, would be somewhat similar to
how a Replicaset vs Pod works in Kubernetes. It would create a new
FlinkDeployment and would delete the old one once the new reached a healthy
running state.

Adding a new CR allows us to not overcomplicate the existing
resource/controller loop but simply leverage it. If you prototype something
along these lines, please feel free to share and then we can discuss if we
want to incorporate something like this in the operator repo in the future
:)

Cheers,
Gyula

On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
user@flink.apache.org> wrote:

> Hi,
>
> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
> support for blue green deployment will not be supported or will not happen
> soon.
>
> I'd like to know if some of you have built a custom mechanism on top of
> this operator to support the blue green deployment and if you would have
> any advice on implementing this?
>
> --
>
> Nicolas Fraison (he/him)
>


Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Tony Chen
Hi Flink Community,

Does the flink-kubernetes-operator support RocksDB as the state backend for
FlinkDeployment?

We have some Flink applications that have large states, and we were able to
deal with these large states in the past with RocksDB. If there is no
support for RocksDB, are there any recommendations on how we can decrease
the size of these states?

Thanks,
Tony

-- 



Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Yaroslav Tkachenko
Hey Tony,

Pretty much all Flink configuration is supported, including the RocksDB
state backend.

On Wed, Aug 30, 2023 at 9:05 AM Tony Chen  wrote:

> Hi Flink Community,
>
> Does the flink-kubernetes-operator support RocksDB as the state backend
> for FlinkDeployment?
>
> We have some Flink applications that have large states, and we were able
> to deal with these large states in the past with RocksDB. If there is no
> support for RocksDB, are there any recommendations on how we can decrease
> the size of these states?
>
> Thanks,
> Tony
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Gyula Fóra
Hi!

Rocksdb is supported and every other state backend as well.

You can simply set this in you config like before :)

Cheers
Gyula

On Wed, 30 Aug 2023 at 19:22, Tony Chen  wrote:

> Hi Flink Community,
>
> Does the flink-kubernetes-operator support RocksDB as the state backend
> for FlinkDeployment?
>
> We have some Flink applications that have large states, and we were able
> to deal with these large states in the past with RocksDB. If there is no
> support for RocksDB, are there any recommendations on how we can decrease
> the size of these states?
>
> Thanks,
> Tony
>
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Tony Chen
We used to have a Persistent Volume (PV), attached to the pod, for storing
the RocksDB data while using the GoogleCloudPlatform operator. For the
Apache flink-kubernetes-operator, do the pods need a PV attached to it to
use RocksDB? If not, do you have recommendations on memory configuration
for these pods?

I will also need to go through the documentation more on memory
configuration:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/

On Wed, Aug 30, 2023 at 2:17 PM Gyula Fóra  wrote:

> Hi!
>
> Rocksdb is supported and every other state backend as well.
>
> You can simply set this in you config like before :)
>
> Cheers
> Gyula
>
> On Wed, 30 Aug 2023 at 19:22, Tony Chen  wrote:
>
>> Hi Flink Community,
>>
>> Does the flink-kubernetes-operator support RocksDB as the state backend
>> for FlinkDeployment?
>>
>> We have some Flink applications that have large states, and we were able
>> to deal with these large states in the past with RocksDB. If there is no
>> support for RocksDB, are there any recommendations on how we can decrease
>> the size of these states?
>>
>> Thanks,
>> Tony
>>
>>
>> --
>>
>> 
>>
>> Tony Chen
>>
>> Software Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>

-- 



Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: Uneven TM Distribution of Flink on YARN

2023-08-30 Thread Lu Niu
No. we don't use yarn.taskmanager.node-label

Best
Lu

On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:

> Maybe you can check if you have set yarn.taskmanager.node-label for some
> flink jobs?
>
> Best,
> Biao Geng
>
> 发送自 Outlook for iOS 
> --
> *发件人:* Chen Zhanghao 
> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
> *收件人:* Lu Niu ; Weihua Hu 
> *抄送:* Kenan Kılıçtepe ; user 
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> CCing @Weihua Hu  , who is an expert on this. Do
> you have any ideas on the phenomenon here?
>
> Best,
> Zhanghao Chen
> --
> *From:* Lu Niu 
> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
> *To:* Chen Zhanghao 
> *Cc:* Kenan Kılıçtepe ; user 
> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>
> Thanks for your reply.
>
> The interesting fact is that we also managed spark on yarn. However. Only
> the flink cluster is having the issue. I am wondering whether there is a
> difference in the implementation on flink side.
>
> Best
> Lu
>
> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
> wrote:
>
> Hi Lu Niu,
>
> TM distribution on YARN nodes is managed by YARN RM and is out of the
> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
> even distribution of tasks among Flink TMs, and has nothing to do with your
> concerns. Also, the config currently only supports Standalone mode Flink
> clusters, and does not take effect on a Flink cluster on YARN.
>
> Best,
> Zhanghao Chen
> --
> *发件人:* Lu Niu 
> *发送时间:* 2023年8月29日 4:30
> *收件人:* Kenan Kılıçtepe 
> *抄送:* user 
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
> true
>
> Best
> Lu
>
> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
> wrote:
>
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
> Hi, Flink users
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
> Best
> Lu
>
>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Yaroslav Tkachenko
It depends on your requirements. Personally, I don't use PVs and, instead,
mount a volume from a host with a fast instance-level SSD.

On Wed, Aug 30, 2023 at 11:26 AM Tony Chen  wrote:

> We used to have a Persistent Volume (PV), attached to the pod, for storing
> the RocksDB data while using the GoogleCloudPlatform operator. For the
> Apache flink-kubernetes-operator, do the pods need a PV attached to it to
> use RocksDB? If not, do you have recommendations on memory configuration
> for these pods?
>
> I will also need to go through the documentation more on memory
> configuration:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/
>
> On Wed, Aug 30, 2023 at 2:17 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> Rocksdb is supported and every other state backend as well.
>>
>> You can simply set this in you config like before :)
>>
>> Cheers
>> Gyula
>>
>> On Wed, 30 Aug 2023 at 19:22, Tony Chen  wrote:
>>
>>> Hi Flink Community,
>>>
>>> Does the flink-kubernetes-operator support RocksDB as the state backend
>>> for FlinkDeployment?
>>>
>>> We have some Flink applications that have large states, and we were able
>>> to deal with these large states in the past with RocksDB. If there is no
>>> support for RocksDB, are there any recommendations on how we can decrease
>>> the size of these states?
>>>
>>> Thanks,
>>> Tony
>>>
>>>
>>> --
>>>
>>> 
>>>
>>> Tony Chen
>>>
>>> Software Engineer
>>>
>>> Menlo Park, CA
>>>
>>> Don't copy, share, or use this email without permission. If you received
>>> it by accident, please let us know and then delete it right away.
>>>
>>
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Gyula Fóra
I agree with Yaroslav, generally speaking PVs are not necessary or even
recommended for RocksDB because the state doesn't need to be shared,
recovered later anyways.
It's usually faster and cheaper to go with instance level SSDs.

Gyula

On Wed, Aug 30, 2023 at 8:37 PM Yaroslav Tkachenko 
wrote:

> It depends on your requirements. Personally, I don't use PVs and, instead,
> mount a volume from a host with a fast instance-level SSD.
>
> On Wed, Aug 30, 2023 at 11:26 AM Tony Chen 
> wrote:
>
>> We used to have a Persistent Volume (PV), attached to the pod, for
>> storing the RocksDB data while using the GoogleCloudPlatform operator. For
>> the Apache flink-kubernetes-operator, do the pods need a PV attached to it
>> to use RocksDB? If not, do you have recommendations on memory configuration
>> for these pods?
>>
>> I will also need to go through the documentation more on memory
>> configuration:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/
>>
>> On Wed, Aug 30, 2023 at 2:17 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> Rocksdb is supported and every other state backend as well.
>>>
>>> You can simply set this in you config like before :)
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Wed, 30 Aug 2023 at 19:22, Tony Chen 
>>> wrote:
>>>
 Hi Flink Community,

 Does the flink-kubernetes-operator support RocksDB as the state backend
 for FlinkDeployment?

 We have some Flink applications that have large states, and we were
 able to deal with these large states in the past with RocksDB. If there is
 no support for RocksDB, are there any recommendations on how we can
 decrease the size of these states?

 Thanks,
 Tony


 --

 

 Tony Chen

 Software Engineer

 Menlo Park, CA

 Don't copy, share, or use this email without permission. If you
 received it by accident, please let us know and then delete it right away.

>>>
>>
>> --
>>
>> 
>>
>> Tony Chen
>>
>> Software Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>


Re: flink k8s operator - problem with patching seession cluster

2023-08-30 Thread Krzysztof Chmielewski
Just want to broth this up in case it was missed in the other
messages/queries :)

TL:DR
How to add TM to Flink Session cluster via Java K8s client if Session
Cluster has running jobs?

Thanks,
Krzysztof

pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi community,
> I have a use case where I would like to add an extra TM) to a running
> Flink session cluster that has Flink jobs deployed. Session cluster
> creation, job submission and cluster patching is done using flink k8s
> operator Java API. The Details of this are presented here [1]
>
> I would like to ask, what is a recommended path to add a TM to existing
> Session Cluster that currently runs number of Flink jobs using Java API.
> For simplicity lets assume that I dont want to resume jobs from a
> savepoint, just redeploy them.
>
> When executing steps from [1] I'm facing an issue where Session jobs are
> not redeployed on patched Session cluster however kubectl shows that there
> is FlinkSessionJob subbmited to the k8s.
>
> Additionally when I'm trying to delete FlinkSessionJob from kubectl, Flink
> k8s operator throws an exception described in [1]. In fact the state of
> that Flink deployment requires few steps to clean it up after that patch.
>
>
> [1]
> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>


Re: 退订

2023-08-30 Thread liu ron
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

Best,
Ron

喻凯  于2023年8月30日周三 14:17写道:

>
>


Re: Job graph

2023-08-30 Thread liu ron
Hi, Nikolaos

As far as I know, JobGraph is a relatively low-level concept, and currently
we don't expose it directly to users, and don't provide a direct Restful
API to get it from JobManager. Why do you need to get JobGraph and what is
your real need?

Best,
Ron

Nikolaos Paraskakis  于2023年8月31日周四 01:13写道:

> Hello folks,
>
> I am trying to get the job graph of a running flink job. I want to use
> flink libraries. For now, I have the RestClusterClient and the job IDs.
> Tell me please how to get the job graph.
>
> Thank you.


Re: Access to collector in the process function

2023-08-30 Thread liu ron
Hi, Oscar

The collector object is created in [1] open method, so we can't get it in
ProcessFunciton open method. One possible way is you change the source
code, place it in ProcessFunction#Context.

[1]
https://github.com/apache/flink/blob/aa8d93ea239f5be79066b7e5caad08d966c86ab2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java#L57


Best,
Ron

Oscar Perez via user  于2023年8月30日周三 21:11写道:

> Hi!
> We would like to use hexagonal architecture in our design and treat the
> collector as an output port when sending events from the use case.
>
> For that, we would like to call an interface from the use case that
> effectively sends the event ultimately via out.collect
>
> The problem is that for instantiating the use case we need to inject the
> collector as dependency and we dont have access to the collector at the
> process function class level, only at the processelement method level.
>
> Is there any way to access the collector from the process function class,
> in the open method ?
>
> Regards,
> Oscar
>