Re: Dynamic configuration of Flink checkpoint interval

2021-05-30 Thread Senhong Liu
Hi all,

In fact, a pretty similar JIRA has been created, which is
https://issues.apache.org/jira/browse/FLINK-18578 and I am working on it.
In the near future, I will publish a FLIP and start a discussion about
that. We look forward to your participation.

Best,
Senhong Liu

JING ZHANG  于2021年5月31日周一 上午10:21写道:

> Hi Kai,
>
> Happy to hear that.
> Would you please paste the JIRA link in the email after you create it.
> Maybe it could help other users who encounter the same problem. Thanks very
> much.
>
> Best regards,
> JING ZHANG
>
> Kai Fu  于2021年5月30日周日 下午11:19写道:
>
>> Hi Jing,
>>
>> Yup, what you're describing is what I want. I also tried the approach you
>> suggested and it works. I'm going to take that approach for the moment and
>> create a Jira issue for this feature.
>>
>> On Sun, May 30, 2021 at 8:57 PM JING ZHANG  wrote:
>>
>>> Hi Kai,
>>>
>>> Do you try to find a way to hot update checkpoint interval or
>>> disable/enable checkpoint without stop and restart job?
>>> Unfortunately, it is not supported yet, AFAIK.
>>> You're very welcome to create an issue and describe your needs here (Flink’s
>>> Jira <http://issues.apache.org/jira/browse/FLINK>) .
>>> At present, you may would like to use the following temporary solution:
>>>   1. set a bigger value as checkpoint interval, start your job
>>>   2. do a savepoint after cold start is completed
>>>   3. set a normal value as checkpoint interval, restart the job from
>>> savepoint
>>>
>>> Best regards,
>>> JING ZHANG
>>>
>>> Kai Fu  于2021年5月30日周日 下午7:13写道:
>>>
>>>> Hi team,
>>>>
>>>> We want to know if Flink has some dynamic configuration of the
>>>> checkpoint interval. Our use case has a cold start phase where the entire
>>>> dataset is replayed from the beginning until the most recent ones.
>>>>
>>>> In the cold start phase, the resources are fully utilized and the
>>>> backpressure is high for all upstream operators, causing the checkpoint
>>>> timeout constantly. The real production traffic is far less than that and
>>>> the current provisioned resource is capable of handling it.
>>>>
>>>> We're thinking if Flink can support the dynamic checkpoint config to
>>>> bypass the checkpoint operation or make it less frequent on the cold start
>>>> phase to speed up the process, while making the checkpoint normal again
>>>> once the cold start is completed.
>>>>
>>>> --
>>>> *Best wishes,*
>>>> *- Kai*
>>>>
>>>
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>


Re: Questions about keyed streams

2021-07-23 Thread Senhong Liu
Hi Dan,

1) If the key doesn’t change in the downstream operators and you want to avoid 
shuffling, maybe the DataStreamUtils#reinterpretAsKeyedStream would be helpful.

2) I am not sure that if you are saying that the data are already partitioned 
in the Kafka and you want to avoid shuffling in the Flink because of reusing 
keyBy(). One solution is that you can try to partition your data in the Kafka 
as if it was partitioned in the Flink when using keyBy(). After that, feel free 
to use  DataStreamUtils#reinterpretAsKeyedStream!

If your use case is not what I described above, maybe you can provide us more 
information.

Best,
Senhong

Sent with a Spark
On Jul 22, 2021, 7:33 AM +0800, Dan Hill , wrote:
> Hi.
>
> 1) If I use the same key in downstream operators (my key is a user id), will 
> the rows stay on the same TaskManager machine?  I join in more info based on 
> the user id as the key.  I'd like for these to stay on the same machine 
> rather than shuffle a bunch of user-specific info to multiple task manager 
> machines.
>
> 2) What are best practices to reduce the number of shuffles when having 
> multiple kafka topics with similar keys (user id).  E.g. should I make make 
> sure the same key writes to the same partition number and then manually which 
> flink tasks get which kafka partitions?