Support for ConfigMap for Runtime Arguments in Flink Kubernetes Operator

2024-02-19 Thread arjun s
Hi team,

I am currently in the process of deploying Flink on Kubernetes using the
Flink Kubernetes Operator and have encountered a scenario where I need to
pass runtime arguments to my Flink application from a properties file.
Given the dynamic nature of Kubernetes environments and the need for
flexibility in configuration management, I was wondering if the Flink
Kubernetes Operator supports the use of Kubernetes ConfigMaps for this
purpose. Specifically, I am interested in understanding:

1.How can I use a ConfigMap to pass runtime arguments or configurations
stored in a properties file to a Flink job deployed using the Kubernetes
operator?
2.Are there best practices or recommended approaches for managing
application-specific configurations, such as database connections or other
external resource settings, using ConfigMaps with the Flink Kubernetes
Operator?
3.If direct support for ConfigMaps is not available or limited, could you
suggest any workarounds or alternative strategies that align with Flink's
deployment model on Kubernetes?

I appreciate any guidance or documentation you could provide on this
matter, as it would greatly assist in streamlining our deployment process
and maintaining configuration flexibility in our Flink applications.

Thank you for your time and support. I look forward to your response.


Re: Impact of RocksDB backend on the Java heap

2024-02-19 Thread Alexis Sarda-Espinosa
Hi Zakelly,

Yeah that makes sense to me, I was just curious about whether reading could
be a bottleneck or not, but I imagine user-specific logic would be better
than a generic cache from Flink that might habe a low hit rate.

Thanks again,
Alexis.

On Mon, 19 Feb 2024, 07:29 Zakelly Lan,  wrote:

> Hi Alexis,
>
> Assuming the bulk load for a batch of sequential keys performs better than
> accessing them one by one, the main problem comes to do we really need to
> access all the keys that were bulk-loaded to cache before. In other words,
> cache hit rate is the key issue. If the rate is high, even though a single
> key-value is large and loading them is slow, it is still worth it to load
> them in advance. In case of timer and iteration (which I missed in last
> mail), the cache is almost guaranteed to hit. Thus a cache is introduced to
> enhance the performance here.
>
>
> Best,
> Zakelly
>
> On Sun, Feb 18, 2024 at 7:42 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Zakelly,
>>
>> thanks for the information, that's interesting. Would you say that
>> reading a subset from RocksDB is fast enough to be pretty much negligible,
>> or could it be a bottleneck if the state of each key is "large"? Again
>> assuming the number of distinct partition keys is large.
>>
>> Regards,
>> Alexis.
>>
>> On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:
>>
>>> Hi Alexis,
>>>
>>> Flink does need some heap memory to bridge requests to rocksdb and
>>> gather the results. In most cases, the memory is discarded immediately
>>> (eventually collected by GC). In case of timers, flink do cache a limited
>>> subset of key-values in heap to improve performance.
>>>
>>> In general you don't need to consider its heap consumption since it is
>>> minor.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
>>> wrote:
>>>
 Hello Alexis,

 I don't think data in RocksDB resides in JVM even with function calls.

 For more details, check the link below:

 https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture

 RocksDB has three main components - memtable, sstfile and WAL(not used
 in Flink as Flink uses checkpointing). When TM starts with statebackend as
 RocksDB,TM has its own RocksDB instance and the state is managed as column
 Family by that TM. Any changes of state go into memtable --> sst-->
 persistent store. When read, data goes to the buffers and cache of RocksDB.

 In the case of RocksDB as state backend, JVM still holds threads stack
 as for high degree of parallelism, there are many
 stacks maintaining separate thread information.

 Hope this helps!!





 On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
 sarda.espin...@gmail.com> wrote:

> Hi Asimansu
>
> The memory RocksDB manages is outside the JVM, yes, but the mentioned
> subsets must be bridged to the JVM somehow so that the data can be exposed
> to the functions running inside Flink, no?
>
> Regards,
> Alexis.
>
>
> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
> wrote:
>
>> Hello Alexis,
>>
>> RocksDB resides off-heap and outside of JVM. The small subset of data
>> ends up on the off-heap in the memory.
>>
>> For more details, check the following link:
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>
>> I hope this addresses your inquiry.
>>
>>
>>
>>
>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Most info regarding RocksDB memory for Flink focuses on what's
>>> needed independently of the JVM (although the Flink process configures 
>>> its
>>> limits and so on). I'm wondering if there are additional special
>>> considerations with regards to the JVM heap in the following scenario.
>>>
>>> Assuming a key used to partition a Flink stream and its state has a
>>> high cardinality, but that the state of each key is small, when Flink
>>> prepares the state to expose to a user function during a call (with a 
>>> given
>>> partition key), I guess it loads only the required subset from RocksDB, 
>>> but
>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>> does, does it stay "cached" in the JVM for some time or is it 
>>> immediately
>>> discarded after the user function completes?
>>>
>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>
>>> Regards,
>>> Alexis.
>>>
>>


Completeablefuture in a flat map operator

2024-02-19 Thread Lasse Nedergaard
Hi. 

I have a case where I would like to collect object from a completeablefuture 
future in a flat map function. 
I run into some problem where I get an exception regarding a buffer pool that 
don’t exists when I collect the objets after some times.  I can see if I for 
testing don’t return from the function (creating a fori loop with a thread 
sleep or wait for the future) it works. 
Can anyone explain what going on behind the screen and if possible any hints 
for a working solution. 

Thanks in advance 

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: Completeablefuture in a flat map operator

2024-02-19 Thread Ken Krugler
Is there some reason why you can’t use an AsyncFunction?

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html

Note that when dealing with event time and exactly once, an AsyncFunction 
provides required support for proper execution.

See 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
 for more details.

— Ken


> On Feb 19, 2024, at 12:20 PM, Lasse Nedergaard 
>  wrote:
> 
> Hi. 
> 
> I have a case where I would like to collect object from a completeablefuture 
> future in a flat map function. 
> I run into some problem where I get an exception regarding a buffer pool that 
> don’t exists when I collect the objets after some times.  I can see if I for 
> testing don’t return from the function (creating a fori loop with a thread 
> sleep or wait for the future) it works. 
> Can anyone explain what going on behind the screen and if possible any hints 
> for a working solution. 
> 
> Thanks in advance 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 


--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot





New Relic alerts for Flink job submission failure and recovery

2024-02-19 Thread elakiya udhayanan
Hi Team,

I would like to know the possibilities of configuring the new relic alerts
for a Flink job whenever the job is submitted, gets failed and recovers
from the failure.
In our case, we have configured the Flink environment as a Kubernetes pod
running on an EKS cluster and the application code is created as a jar and
submitted as a job either from the UI or using the CLI commands.

We have added the following metrics configuration in the flink-conf.yaml ,
we are unsure about the next steps, any help is appreciated. Thanks in
Advance

metrics.reporters: prom
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.host: localhost
metrics.reporter.prom.port: 9250-9260


Thanks,
Elakiya


Re: Completeablefuture in a flat map operator

2024-02-19 Thread Lasse Nedergaard
Hi In my case I have to query an external system. The system returns n numbers of row in a page and I have to call the system until no more data. I could use AsyncFunction if I buffer all records and output at the end but I want to “stream” and don’t have enough memory to hold all the data. AsyncFunction only has one future to return so I can’t see any way to handle n number of results from the external service. Med venlig hilsen / Best regardsLasse NedergaardDen 19. feb. 2024 kl. 21.31 skrev Ken Krugler :Is there some reason why you can’t use an AsyncFunction?AsyncFunction (Flink : 1.20-SNAPSHOT API)nightlies.apache.orgNote that when dealing with event time and exactly once, an AsyncFunction provides required support for proper execution.See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/ for more details.— KenOn Feb 19, 2024, at 12:20 PM, Lasse Nedergaard  wrote:Hi. I have a case where I would like to collect object from a completeablefuture future in a flat map function. I run into some problem where I get an exception regarding a buffer pool that don’t exists when I collect the objets after some times.  I can see if I for testing don’t return from the function (creating a fori loop with a thread sleep or wait for the future) it works. Can anyone explain what going on behind the screen and if possible any hints for a working solution. Thanks in advance Med venlig hilsen / Best regardsLasse Nedergaard
--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink & Pinot