Re: Setting Timers in applyToKeyedState

2024-08-28 Thread Zakelly Lan
Hi Jose, You are right about using {{applyToKeyedState}} to register timers under {{KeyedBroadcastProcessFunction}}. I think the author overlooked this, and we could update the document. As for specifying a smaller keyset to apply function, I think this is a possible direction of optimization. Th

Re: Flink k8s operator unstable deployment

2024-08-28 Thread Naci Simsek
Hi Arthur,How you submit your job? Are you explicitly setting job id when submitting the job?Have you also tried without HA to see the behavior?Looks like the job is submitted with the same ID with the previous job, which the job result stored in HA does not let you submit it with the same job_id.B

Re: Setting Timers in applyToKeyedState

2024-08-28 Thread Jose Vargas Badilla via user
On a slight tangent, it seems to me that the DataStream v2 API only exposes the ability to apply a function to all partitions, not a subset, which the current applyToKeyedState does allow. Is that accurate? If so, adding that ability to the v2 API would be helpful. For example, there can be a proce

Setting Timers in applyToKeyedState

2024-08-28 Thread Jose Vargas Badilla via user
Hi, I recently learned that timers can be set in the KeyedStateFunction that is passed to KeyedBroadcastProcessFunction.Context#applyToKeyedState. The "trick" is to store a reference to the timerService that is available in processElement. This is behavior I have not seen explicitly documented be

Flink k8s operator unstable deployment

2024-08-28 Thread Arthur Catrisse via user
Hello, We are running into issues when deploying flink on kubernetes using the flink-kubernetes-operator with a FlinkDeployment Occasionally, when a *JobManager* gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loo

Re: Get types from Row

2024-08-28 Thread Andrew Otto
I'm not sure if this helps with your need to vary the Sink's schema at runtime, but FWIW you can get the 'schema' of the input datastream via DataStream.getType