Re: Tuning rocksdb configuration

2024-07-26 Thread Zakelly Lan
Hi Banupriya,

Sometimes a sst will not be compacted and will be referenced for a long
time. That depends on how rocksdb picks the files for compaction. It may
happen when some range of keys is never touched at some point of time,
since the rocksdb only takes care of the files or key range that gets large.
Typically you don't need to worry about this, except for the checkpoint
size keeps getting large for a long time.


Best,
Zakelly

On Fri, Jul 26, 2024 at 2:49 PM banu priya  wrote:

> Hi Zakelly,
>
> Thanks a lot for your reply.
>
> I have one more query,
>
> In side checkpoints chk-X directory there is a _metadata file, that
> contains list of other .sst files. In my chk-17000 directory it still
> refers to very old 00014.sst(latest is 225.sst). Why is it so??..
> compaction has happened and that's why 1 to 00013 are not present. Why
> it didn't compact the very old file yet.  1.Do I need to change any other
> rocksdb property? Or 2.does it means my source events are still coming to
> same key and keeps that state??
>
> Window fires for every 2s, so I don't need it the data for long time.
>
> Thanks
> Banupriya
>
> On Fri, 26 Jul, 2024, 11:46 am Zakelly Lan,  wrote:
>
>> Hi Banu,
>>
>> I'm trying to answer your question in brief:
>>
>> 1. Yes, when the memtable reaches the value you configured, a flush will
>> be triggered. And no, sst files have different format with memtables, the
>> size is smaller than 64mb IIUC.
>>
>> 2. Typically you don't need to change this value. If it is set to 2, when
>> 1 write buffer is being flushed to storage, new writes can continue to the
>> other write buffer. Increase this when the flush is too slow.
>>
>> 3. IIUC, bloom filter helps during point query, and window processing
>> requires point queries. So enabling this would help.
>>
>> 4. I'd suggest not setting this to 0. This only affects whether the
>> checkpoint data is stored inline in the metadata file. Maybe the checkpoint
>> size is a little bit different, but it has nothing to do with the
>> throughput.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Jul 25, 2024 at 3:25 PM banu priya  wrote:
>>
>>> Hi All,
>>>
>>> I have a flink job with RMQ Source, filters, tumbling window(uses
>>> processing time fires every 2s), aggregator, RMQ Sink. Enabled incremental
>>> rocksdb checkpoints for every 10s with minimum pause between checkpoints as
>>> 5s. My checkpoints size is keep on increasing , so I am planning to tune
>>> some rocksdb configuration.
>>>
>>>
>>>
>>> Following are my queries. Can someone help me choose a correct values.?
>>>
>>>
>>>
>>> 1.state.backend.rocksdb.writebuffer.size = 64 mb:
>>>
>>> Does it mean once write buffer (memtable) reaches 64 mb it will be
>>> flushed to disk as .sst file. Will .sst file also have size as 64mb?
>>>
>>>
>>>
>>> 2.state.backend.rocksdb.writebuffer.count = 2.
>>>
>>> My job is running with parallelism of 15 and 3 taskmanager(so 5 slots
>>> per taskmanager).  For single rocks DB folder, how can I choose the correct
>>> buffer count.?
>>>
>>> 3. do I need to enable bloom filter?
>>>
>>>  4. state.storage.fs.memory-threshold is 0 in my job. Does it have any
>>> effect in Taskmanager through put or check points size??
>>>
>>> Thanks
>>>
>>> Banu
>>>
>>


Re: Tuning rocksdb configuration

2024-07-26 Thread Zakelly Lan
Hi Banu,

I'm trying to answer your question in brief:

1. Yes, when the memtable reaches the value you configured, a flush will be
triggered. And no, sst files have different format with memtables, the size
is smaller than 64mb IIUC.

2. Typically you don't need to change this value. If it is set to 2, when 1
write buffer is being flushed to storage, new writes can continue to the
other write buffer. Increase this when the flush is too slow.

3. IIUC, bloom filter helps during point query, and window processing
requires point queries. So enabling this would help.

4. I'd suggest not setting this to 0. This only affects whether the
checkpoint data is stored inline in the metadata file. Maybe the checkpoint
size is a little bit different, but it has nothing to do with the
throughput.


Best,
Zakelly

On Thu, Jul 25, 2024 at 3:25 PM banu priya  wrote:

> Hi All,
>
> I have a flink job with RMQ Source, filters, tumbling window(uses
> processing time fires every 2s), aggregator, RMQ Sink. Enabled incremental
> rocksdb checkpoints for every 10s with minimum pause between checkpoints as
> 5s. My checkpoints size is keep on increasing , so I am planning to tune
> some rocksdb configuration.
>
>
>
> Following are my queries. Can someone help me choose a correct values.?
>
>
>
> 1.state.backend.rocksdb.writebuffer.size = 64 mb:
>
> Does it mean once write buffer (memtable) reaches 64 mb it will be flushed
> to disk as .sst file. Will .sst file also have size as 64mb?
>
>
>
> 2.state.backend.rocksdb.writebuffer.count = 2.
>
> My job is running with parallelism of 15 and 3 taskmanager(so 5 slots per
> taskmanager).  For single rocks DB folder, how can I choose the correct
> buffer count.?
>
> 3. do I need to enable bloom filter?
>
>  4. state.storage.fs.memory-threshold is 0 in my job. Does it have any
> effect in Taskmanager through put or check points size??
>
> Thanks
>
> Banu
>


Re: How to enable RocksDB native metrics?

2024-04-07 Thread Zakelly Lan
Hi Lei,

You can enable it by some configurations listed in:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
 (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan  wrote:

> Hi Lei,
>
> You can enable it by some configurations listed in:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
>  (RocksDB Native Metrics)
>
>
> Best,
> Zakelly
>
> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang  wrote:
>
>>
>> Using big state and want to do some performance tuning, how can i enable
>> RocksDB native metrics?
>>
>> I  am using  Flink 1.14.4
>>
>> Thanks,
>> Lei
>>
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Zakelly Lan
Congratulations!


Best,
Zakelly

On Thu, Mar 21, 2024 at 12:05 PM weijie guo 
wrote:

> Congratulations! Well done.
>
>
> Best regards,
>
> Weijie
>
>
> Feng Jin  于2024年3月21日周四 11:40写道:
>
>> Congratulations!
>>
>>
>> Best,
>> Feng
>>
>>
>> On Thu, Mar 21, 2024 at 11:37 AM Ron liu  wrote:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Ron
>> >
>> > Jark Wu  于2024年3月21日周四 10:46写道:
>> >
>> > > Congratulations and welcome!
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:
>> > >
>> > > > Congratulations!
>> > > >
>> > > > Best,
>> > > > Rui
>> > > >
>> > > > On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan 
>> > > wrote:
>> > > >
>> > > > > Congrattulations!
>> > > > >
>> > > > > Best,
>> > > > > Hang
>> > > > >
>> > > > > Lincoln Lee  于2024年3月21日周四 09:54写道:
>> > > > >
>> > > > >>
>> > > > >> Congrats, thanks for the great work!
>> > > > >>
>> > > > >>
>> > > > >> Best,
>> > > > >> Lincoln Lee
>> > > > >>
>> > > > >>
>> > > > >> Peter Huang  于2024年3月20日周三 22:48写道:
>> > > > >>
>> > > > >>> Congratulations
>> > > > >>>
>> > > > >>>
>> > > > >>> Best Regards
>> > > > >>> Peter Huang
>> > > > >>>
>> > > > >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang > >
>> > > > wrote:
>> > > > >>>
>> > > > 
>> > > >  Congratulations
>> > > > 
>> > > > 
>> > > > 
>> > > >  Best,
>> > > >  Huajie Wang
>> > > > 
>> > > > 
>> > > > 
>> > > >  Leonard Xu  于2024年3月20日周三 21:36写道:
>> > > > 
>> > > > > Hi devs and users,
>> > > > >
>> > > > > We are thrilled to announce that the donation of Flink CDC as
>> a
>> > > > > sub-project of Apache Flink has completed. We invite you to
>> > explore
>> > > > the new
>> > > > > resources available:
>> > > > >
>> > > > > - GitHub Repository: https://github.com/apache/flink-cdc
>> > > > > - Flink CDC Documentation:
>> > > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
>> > > > >
>> > > > > After Flink community accepted this donation[1], we have
>> > completed
>> > > > > software copyright signing, code repo migration, code cleanup,
>> > > > website
>> > > > > migration, CI migration and github issues migration etc.
>> > > > > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>> > > > > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
>> > > > contributors
>> > > > > for their contributions and help during this process!
>> > > > >
>> > > > >
>> > > > > For all previous contributors: The contribution process has
>> > > slightly
>> > > > > changed to align with the main Flink project. To report bugs
>> or
>> > > > suggest new
>> > > > > features, please open tickets
>> > > > > Apache Jira (https://issues.apache.org/jira).  Note that we
>> will
>> > > no
>> > > > > longer accept GitHub issues for these purposes.
>> > > > >
>> > > > >
>> > > > > Welcome to explore the new repository and documentation. Your
>> > > > feedback
>> > > > > and contributions are invaluable as we continue to improve
>> Flink
>> > > CDC.
>> > > > >
>> > > > > Thanks everyone for your support and happy exploring Flink
>> CDC!
>> > > > >
>> > > > > Best,
>> > > > > Leonard
>> > > > > [1]
>> > > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Zakelly Lan
Congratulations!


Best,
Zakelly

On Thu, Mar 21, 2024 at 12:05 PM weijie guo 
wrote:

> Congratulations! Well done.
>
>
> Best regards,
>
> Weijie
>
>
> Feng Jin  于2024年3月21日周四 11:40写道:
>
>> Congratulations!
>>
>>
>> Best,
>> Feng
>>
>>
>> On Thu, Mar 21, 2024 at 11:37 AM Ron liu  wrote:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Ron
>> >
>> > Jark Wu  于2024年3月21日周四 10:46写道:
>> >
>> > > Congratulations and welcome!
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:
>> > >
>> > > > Congratulations!
>> > > >
>> > > > Best,
>> > > > Rui
>> > > >
>> > > > On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan 
>> > > wrote:
>> > > >
>> > > > > Congrattulations!
>> > > > >
>> > > > > Best,
>> > > > > Hang
>> > > > >
>> > > > > Lincoln Lee  于2024年3月21日周四 09:54写道:
>> > > > >
>> > > > >>
>> > > > >> Congrats, thanks for the great work!
>> > > > >>
>> > > > >>
>> > > > >> Best,
>> > > > >> Lincoln Lee
>> > > > >>
>> > > > >>
>> > > > >> Peter Huang  于2024年3月20日周三 22:48写道:
>> > > > >>
>> > > > >>> Congratulations
>> > > > >>>
>> > > > >>>
>> > > > >>> Best Regards
>> > > > >>> Peter Huang
>> > > > >>>
>> > > > >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang > >
>> > > > wrote:
>> > > > >>>
>> > > > 
>> > > >  Congratulations
>> > > > 
>> > > > 
>> > > > 
>> > > >  Best,
>> > > >  Huajie Wang
>> > > > 
>> > > > 
>> > > > 
>> > > >  Leonard Xu  于2024年3月20日周三 21:36写道:
>> > > > 
>> > > > > Hi devs and users,
>> > > > >
>> > > > > We are thrilled to announce that the donation of Flink CDC as
>> a
>> > > > > sub-project of Apache Flink has completed. We invite you to
>> > explore
>> > > > the new
>> > > > > resources available:
>> > > > >
>> > > > > - GitHub Repository: https://github.com/apache/flink-cdc
>> > > > > - Flink CDC Documentation:
>> > > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
>> > > > >
>> > > > > After Flink community accepted this donation[1], we have
>> > completed
>> > > > > software copyright signing, code repo migration, code cleanup,
>> > > > website
>> > > > > migration, CI migration and github issues migration etc.
>> > > > > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>> > > > > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
>> > > > contributors
>> > > > > for their contributions and help during this process!
>> > > > >
>> > > > >
>> > > > > For all previous contributors: The contribution process has
>> > > slightly
>> > > > > changed to align with the main Flink project. To report bugs
>> or
>> > > > suggest new
>> > > > > features, please open tickets
>> > > > > Apache Jira (https://issues.apache.org/jira).  Note that we
>> will
>> > > no
>> > > > > longer accept GitHub issues for these purposes.
>> > > > >
>> > > > >
>> > > > > Welcome to explore the new repository and documentation. Your
>> > > > feedback
>> > > > > and contributions are invaluable as we continue to improve
>> Flink
>> > > CDC.
>> > > > >
>> > > > > Thanks everyone for your support and happy exploring Flink
>> CDC!
>> > > > >
>> > > > > Best,
>> > > > > Leonard
>> > > > > [1]
>> > > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Zakelly Lan
Congratulations!

Thanks Lincoln, Yun, Martijn and Jing for driving this release.
Thanks everyone involved.


Best,
Zakelly

On Mon, Mar 18, 2024 at 5:05 PM weijie guo 
wrote:

> Congratulations!
>
> Thanks release managers and all the contributors involved.
>
> Best regards,
>
> Weijie
>
>
> Leonard Xu  于2024年3月18日周一 16:45写道:
>
>> Congratulations, thanks release managers and all involved for the great
>> work!
>>
>>
>> Best,
>> Leonard
>>
>> > 2024年3月18日 下午4:32,Jingsong Li  写道:
>> >
>> > Congratulations!
>> >
>> > On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >>
>> >> Congratulations, thanks for the great work!
>> >>
>> >> Best,
>> >> Rui
>> >>
>> >> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
>> wrote:
>> >>>
>> >>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
>> series.
>> >>>
>> >>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> >>>
>> >>> The release is available for download at:
>> >>> https://flink.apache.org/downloads.html
>> >>>
>> >>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> >>>
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> >>>
>> >>> The full release notes are available in Jira:
>> >>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> >>>
>> >>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>> >>>
>> >>>
>> >>> Best,
>> >>> Yun, Jing, Martijn and Lincoln
>>
>>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Zakelly Lan
Congratulations!

Thanks Lincoln, Yun, Martijn and Jing for driving this release.
Thanks everyone involved.


Best,
Zakelly

On Mon, Mar 18, 2024 at 5:05 PM weijie guo 
wrote:

> Congratulations!
>
> Thanks release managers and all the contributors involved.
>
> Best regards,
>
> Weijie
>
>
> Leonard Xu  于2024年3月18日周一 16:45写道:
>
>> Congratulations, thanks release managers and all involved for the great
>> work!
>>
>>
>> Best,
>> Leonard
>>
>> > 2024年3月18日 下午4:32,Jingsong Li  写道:
>> >
>> > Congratulations!
>> >
>> > On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >>
>> >> Congratulations, thanks for the great work!
>> >>
>> >> Best,
>> >> Rui
>> >>
>> >> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
>> wrote:
>> >>>
>> >>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
>> series.
>> >>>
>> >>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> >>>
>> >>> The release is available for download at:
>> >>> https://flink.apache.org/downloads.html
>> >>>
>> >>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> >>>
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> >>>
>> >>> The full release notes are available in Jira:
>> >>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> >>>
>> >>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>> >>>
>> >>>
>> >>> Best,
>> >>> Yun, Jing, Martijn and Lincoln
>>
>>


Re: Unaligned checkpoint blocked by long Async operation

2024-03-17 Thread Zakelly Lan
I agree. Also create https://issues.apache.org/jira/browse/FLINK-34704 for
tracking and further discussion.


Best,
Zakelly

On Fri, Mar 15, 2024 at 2:59 PM Gyula Fóra  wrote:

> Posting this to dev as well...
>
> Thanks Zakelly,
> Sounds like a solution could be to add a new different version of yield
> that would actually yield to the checkpoint barrier too. That way operator
> implementations could decide whether any state modification may or may not
> have happened and can optionally allow checkpoint to be taken in the
> "middle of record  processing".
>
> Gyula
>
> On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:
>
>> Hi Gyula,
>>
>> Processing checkpoint halfway through `processElement` is problematic.
>> The current element will not be included in the input in-flight data, and
>> we cannot assume it has taken effect on the state by user code. So the best
>> way is to treat `processElement` as an 'atomic' operation. I guess that's
>> why the priority of the cp barrier is set low.
>> However, the AsyncWaitOperator is a special case where we know the
>> element blocked at `addToWorkQueue` has not started triggering the
>> userFunction. Thus I'd suggest putting the element in the queue when the cp
>> barrier comes, and taking a snapshot of the whole queue afterwards. The
>> problem will be solved. But this approach also involves some code
>> modifications on the mailbox executor.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>>
>>> Thank you for the detailed analysis Zakelly.
>>>
>>> I think we should consider whether yield should process checkpoint
>>> barriers because this puts quite a serious limitation on the unaligned
>>> checkpoints in these cases.
>>> Do you know what is the reason behind the current priority setting? Is
>>> there a problem with processing the barrier here?
>>>
>>> Gyula
>>>
>>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>>> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> Well I tried your example in local mini-cluster, and it seems the
>>>> source can take checkpoints but it will block in the following
>>>> AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until
>>>> the current `processElement` finishes its execution. In your example, the
>>>> element queue of `AsyncWaitOperator` will end up full and `processElement`
>>>> will be blocked at `addToWorkQueue`. Even though it will call
>>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>>> unprocessed since the priority of the barrier is -1, lower than the one
>>>> `yield()` should handle. I verified this using single-step debugging.
>>>>
>>>> And if one element could finish its async io, the cp barrier can be
>>>> processed afterwards. For example:
>>>> ```
>>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>>>> env.getConfig().setParallelism(1);
>>>> AsyncDataStream.orderedWait(
>>>> env.fromSequence(Long.MIN_VALUE,
>>>> Long.MAX_VALUE).shuffle(),
>>>> new AsyncFunction() {
>>>> boolean first = true;
>>>> @Override
>>>> public void asyncInvoke(Long aLong,
>>>> ResultFuture resultFuture) {
>>>> if (first) {
>>>>
>>>> Executors.newSingleThreadExecutor().execute(() -> {
>>>> try {
>>>> Thread.sleep(2); // process
>>>> after 20s, only for the first one.
>>>> } catch (Throwable e) {}
>>>> LOG.info("Complete one");
>>>>
>>>> resultFuture.complete(Collections.singleton(1L));
>>>> });
>>>> first = false;
>>>> }
>>>> }
>>>> },
>>>> 24,
>>>> TimeUnit.HOURS,
>>>> 1)
>>>> .print();
>>>> ```
>>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>>> print.
>

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula,

Processing checkpoint halfway through `processElement` is problematic. The
current element will not be included in the input in-flight data, and we
cannot assume it has taken effect on the state by user code. So the best
way is to treat `processElement` as an 'atomic' operation. I guess that's
why the priority of the cp barrier is set low.
However, the AsyncWaitOperator is a special case where we know the element
blocked at `addToWorkQueue` has not started triggering the userFunction.
Thus I'd suggest putting the element in the queue when the cp barrier
comes, and taking a snapshot of the whole queue afterwards. The problem
will be solved. But this approach also involves some code modifications on
the mailbox executor.


Best,
Zakelly

On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:

> Thank you for the detailed analysis Zakelly.
>
> I think we should consider whether yield should process checkpoint
> barriers because this puts quite a serious limitation on the unaligned
> checkpoints in these cases.
> Do you know what is the reason behind the current priority setting? Is
> there a problem with processing the barrier here?
>
> Gyula
>
> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan  wrote:
>
>> Hi Gyula,
>>
>> Well I tried your example in local mini-cluster, and it seems the source
>> can take checkpoints but it will block in the following AsyncWaitOperator.
>> IIUC, the unaligned checkpoint barrier should wait until the current
>> `processElement` finishes its execution. In your example, the element queue
>> of `AsyncWaitOperator` will end up full and `processElement` will be
>> blocked at `addToWorkQueue`. Even though it will call
>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>> unprocessed since the priority of the barrier is -1, lower than the one
>> `yield()` should handle. I verified this using single-step debugging.
>>
>> And if one element could finish its async io, the cp barrier can be
>> processed afterwards. For example:
>> ```
>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>> env.getConfig().setParallelism(1);
>> AsyncDataStream.orderedWait(
>> env.fromSequence(Long.MIN_VALUE,
>> Long.MAX_VALUE).shuffle(),
>> new AsyncFunction() {
>> boolean first = true;
>> @Override
>> public void asyncInvoke(Long aLong,
>> ResultFuture resultFuture) {
>> if (first) {
>>
>> Executors.newSingleThreadExecutor().execute(() -> {
>> try {
>> Thread.sleep(2); // process after
>> 20s, only for the first one.
>> } catch (Throwable e) {}
>> LOG.info("Complete one");
>>
>> resultFuture.complete(Collections.singleton(1L));
>> });
>> first = false;
>> }
>> }
>> },
>> 24,
>> TimeUnit.HOURS,
>> 1)
>> .print();
>> ```
>> The checkpoint 1 can be normally finished after the "Complete one" log
>> print.
>>
>> I guess the users have no means to solve this problem, we might optimize
>> this later.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>>
>>> Hey all!
>>>
>>> I encountered a strange and unexpected behaviour when trying to use
>>> unaligned checkpoints with AsyncIO.
>>>
>>> If the async operation queue is full and backpressures the pipeline
>>> completely, then unaligned checkpoints cannot be completed. To me this
>>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>>> that we can simply checkpoint the queue and not have to wait for the
>>> completion.
>>>
>>> To repro you can simply run:
>>>
>>> AsyncDataStream.orderedWait(
>>> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>> new AsyncFunction() {
>>> @Override
>>> public void asyncInvoke(Long aLong, ResultFuture
>>> resultFuture) {}
>>> },
>>> 24,
>>> TimeUnit.HOURS,
>>> 1)
>>> .print();
>>>
>>> This pipeline will completely backpressure the source and checkpoints do
>>> not progress even though they are unaligned. Already the source cannot take
>>> a checkpoint it seems which for me is surprising because this is using the
>>> new source interface.
>>>
>>> Does anyone know why this happens and if there may be a solution?
>>>
>>> Thanks
>>> Gyula
>>>
>>


Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula,

Well I tried your example in local mini-cluster, and it seems the source
can take checkpoints but it will block in the following AsyncWaitOperator.
IIUC, the unaligned checkpoint barrier should wait until the current
`processElement` finishes its execution. In your example, the element queue
of `AsyncWaitOperator` will end up full and `processElement` will be
blocked at `addToWorkQueue`. Even though it will call
`mailboxExecutor.yield();`, it still leaves the checkpoint barrier
unprocessed since the priority of the barrier is -1, lower than the one
`yield()` should handle. I verified this using single-step debugging.

And if one element could finish its async io, the cp barrier can be
processed afterwards. For example:
```
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
env.getConfig().setParallelism(1);
AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction() {
boolean first = true;
@Override
public void asyncInvoke(Long aLong, ResultFuture
resultFuture) {
if (first) {
Executors.newSingleThreadExecutor().execute(()
-> {
try {
Thread.sleep(2); // process after
20s, only for the first one.
} catch (Throwable e) {}
LOG.info("Complete one");

resultFuture.complete(Collections.singleton(1L));
});
first = false;
}
}
},
24,
TimeUnit.HOURS,
1)
.print();
```
The checkpoint 1 can be normally finished after the "Complete one" log
print.

I guess the users have no means to solve this problem, we might optimize
this later.


Best,
Zakelly

On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:

> Hey all!
>
> I encountered a strange and unexpected behaviour when trying to use
> unaligned checkpoints with AsyncIO.
>
> If the async operation queue is full and backpressures the pipeline
> completely, then unaligned checkpoints cannot be completed. To me this
> sounds counterintuitive because one of the benefits of the AsyncIO would be
> that we can simply checkpoint the queue and not have to wait for the
> completion.
>
> To repro you can simply run:
>
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction() {
> @Override
> public void asyncInvoke(Long aLong, ResultFuture
> resultFuture) {}
> },
> 24,
> TimeUnit.HOURS,
> 1)
> .print();
>
> This pipeline will completely backpressure the source and checkpoints do
> not progress even though they are unaligned. Already the source cannot take
> a checkpoint it seems which for me is surprising because this is using the
> new source interface.
>
> Does anyone know why this happens and if there may be a solution?
>
> Thanks
> Gyula
>


Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zakelly Lan
Hi Gabriele,

Quick answer: You can use the built-in window operators which have been
integrated with state backends including RocksDB.


Thanks,
Zakelly

On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
wrote:

> Hi Gabriele,
>
> I'd recommend extending the existing window function whenever possible, as
> Flink will automatically cover state management for you and no need to be
> concerned with state backend details. Incremental aggregation for reduce
> state size is also out of the box if your usage can be satisfied with the
> reduce/aggregate function pattern, which is important for large windows.
>
> Best,
> Zhanghao Chen
> --
> *From:* Gabriele Mencagli 
> *Sent:* Monday, March 4, 2024 19:38
> *To:* user@flink.apache.org 
> *Subject:* Question about time-based operators with RocksDB backend
>
>
> Dear Flink Community,
>
> I am using Flink with the DataStream API and operators implemented using
> RichedFunctions. I know that Flink provides a set of window-based operators
> with time-based semantics and tumbling/sliding windows.
>
> By reading the Flink documentation, I understand that there is the
> possibility to change the memory backend utilized for storing the in-flight
> state of the operators. For example, using RocksDB for this purpose to cope
> with a larger-than-memory state. If I am not wrong, to transparently change
> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
> to access the state. For example, the Keyed State API with different
> abstractions such as ValueState, ListState, etc... as reported here
> 
> .
>
> My question is related to the utilization of time-based window operators
> with the RocksDB backend. Suppose for example very large temporal windows
> with a huge number of keys in the stream. I am wondering if there is a
> possibility to use the built-in window operators of Flink (e.g., with an
> AggregateFunction or a more generic ProcessWindowFunction as here
> )
> transparently with RocksDB support as a state back-end, or if I have to
> develop the window operator in a raw manner using the Keyed State API
> (e.g., ListState, AggregateState) for this purpose by implementing the
> underlying window logic manually in the code of RichedFunction of the
> operator (e.g., a FlatMap).
> Thanks for your support,
>
> --
> Gabriele Mencagli
>
>


Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend,
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well,
you may try something hacky. You may use the passed-in `Context` instance
in processElement, and leverage java reflection to get
the KeyedProcessOperator instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
wrote:

> Thanks Zakelly,
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
> How can I do something similar without modifying the Flink runtime?
>
> Lorenzo
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
>> Hi Lorenzo,
>>
>> It is not recommended to do this with the keyed state. However there is
>> an example in flink code (FastTop1Function#snapshotState) [1] of setting
>> keys when snapshotState().
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>>
>> Best,
>> Zakelly
>>
>> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Thias
>>>
>>> I considered CheckpointedFunction.
>>> In snapshotState() I would have to update the state of each key,
>>> extracting the in-memory "state" of each key and putting it in the state
>>> with state.update(...) .
>>> This must happen per key,
>>> But snapshotState() has no visibility of the keys. And I have no way of
>>> selectively accessing the state of a specific key to update it.
>>> Unless I am missing something
>>>
>>> Thanks
>>> Lorenzo
>>>
>>>
>>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
>>>> Good morning Lorenzo,
>>>>
>>>>
>>>>
>>>> You may want to implement
>>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>>> your KeyedProcessFunction.
>>>>
>>>> Btw. By the time initializeState(…) is called, the state backend is
>>>> fully initialized and can be read and written to (which is not the case for
>>>> when the open(…) function is called.
>>>>
>>>> In initializeState(…) you also get access to state of different
>>>> operator key.
>>>>
>>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>>> store data.
>>>>
>>>>
>>>>
>>>> Sincere greetings
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>> *From:* Lorenzo Nicora 
>>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>>> *To:* Flink User Group 
>>>> *Subject:* Preparing keyed state before snapshot
>>>>
>>>>
>>>>
>>>> Hello everyone,
>>>>
>>>>
>>>>
>>>> I have a convoluted problem.
>>>>
>>>>
>>>>
>>>> I am implementing a KeyedProcessFunction that keeps some
>>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>>> value = the non-serializable "state").
>>>>
>>>>
>>>>
>>>> I can extract a serializable representation to put in Flink state, and
>>>> I can load my in-memory "state" from the Flink state. But these operations
>>>> are expensive.
>>>>
>>>>
>>>>
>>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>>> in processElement(), on the first record for the key.
>>>>
>>>>
>>>>
>>>> The problem is saving the in-memory "state" to Flink state.
>>>>
>>>> I need to do it only before the state snapshot. But
>>>> KeyedProcessFunction has no entrypoint called before the state snapshot.
>>>>
&g

Re: Impact of RocksDB backend on the Java heap

2024-02-18 Thread Zakelly Lan
Hi Alexis,

Assuming the bulk load for a batch of sequential keys performs better than
accessing them one by one, the main problem comes to do we really need to
access all the keys that were bulk-loaded to cache before. In other words,
cache hit rate is the key issue. If the rate is high, even though a single
key-value is large and loading them is slow, it is still worth it to load
them in advance. In case of timer and iteration (which I missed in last
mail), the cache is almost guaranteed to hit. Thus a cache is introduced to
enhance the performance here.


Best,
Zakelly

On Sun, Feb 18, 2024 at 7:42 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Zakelly,
>
> thanks for the information, that's interesting. Would you say that reading
> a subset from RocksDB is fast enough to be pretty much negligible, or could
> it be a bottleneck if the state of each key is "large"? Again assuming the
> number of distinct partition keys is large.
>
> Regards,
> Alexis.
>
> On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:
>
>> Hi Alexis,
>>
>> Flink does need some heap memory to bridge requests to rocksdb and gather
>> the results. In most cases, the memory is discarded immediately (eventually
>> collected by GC). In case of timers, flink do cache a limited subset of
>> key-values in heap to improve performance.
>>
>> In general you don't need to consider its heap consumption since it is
>> minor.
>>
>>
>> Best,
>> Zakelly
>>
>> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
>> wrote:
>>
>>> Hello Alexis,
>>>
>>> I don't think data in RocksDB resides in JVM even with function calls.
>>>
>>> For more details, check the link below:
>>>
>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>>>
>>> RocksDB has three main components - memtable, sstfile and WAL(not used
>>> in Flink as Flink uses checkpointing). When TM starts with statebackend as
>>> RocksDB,TM has its own RocksDB instance and the state is managed as column
>>> Family by that TM. Any changes of state go into memtable --> sst-->
>>> persistent store. When read, data goes to the buffers and cache of RocksDB.
>>>
>>> In the case of RocksDB as state backend, JVM still holds threads stack
>>> as for high degree of parallelism, there are many
>>> stacks maintaining separate thread information.
>>>
>>> Hope this helps!!
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hi Asimansu
>>>>
>>>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>>>> subsets must be bridged to the JVM somehow so that the data can be exposed
>>>> to the functions running inside Flink, no?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>>>> wrote:
>>>>
>>>>> Hello Alexis,
>>>>>
>>>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>>>> ends up on the off-heap in the memory.
>>>>>
>>>>> For more details, check the following link:
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>>>
>>>>> I hope this addresses your inquiry.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>>>> sarda.espin...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Most info regarding RocksDB memory for Flink focuses on what's needed
>>>>>> independently of the JVM (although the Flink process configures its 
>>>>>> limits
>>>>>> and so on). I'm wondering if there are additional special considerations
>>>>>> with regards to the JVM heap in the following scenario.
>>>>>>
>>>>>> Assuming a key used to partition a Flink stream and its state has a
>>>>>> high cardinality, but that the state of each key is small, when Flink
>>>>>> prepares the state to expose to a user function during a call (with a 
>>>>>> given
>>>>>> partition key), I guess it loads only the required subset from RocksDB, 
>>>>>> but
>>>>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>>>>> does, does it stay "cached" in the JVM for some time or is it immediately
>>>>>> discarded after the user function completes?
>>>>>>
>>>>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>


Re: Impact of RocksDB backend on the Java heap

2024-02-17 Thread Zakelly Lan
Hi Alexis,

Flink does need some heap memory to bridge requests to rocksdb and gather
the results. In most cases, the memory is discarded immediately (eventually
collected by GC). In case of timers, flink do cache a limited subset of
key-values in heap to improve performance.

In general you don't need to consider its heap consumption since it is
minor.


Best,
Zakelly

On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
wrote:

> Hello Alexis,
>
> I don't think data in RocksDB resides in JVM even with function calls.
>
> For more details, check the link below:
>
> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>
> RocksDB has three main components - memtable, sstfile and WAL(not used in
> Flink as Flink uses checkpointing). When TM starts with statebackend as
> RocksDB,TM has its own RocksDB instance and the state is managed as column
> Family by that TM. Any changes of state go into memtable --> sst-->
> persistent store. When read, data goes to the buffers and cache of RocksDB.
>
> In the case of RocksDB as state backend, JVM still holds threads stack as
> for high degree of parallelism, there are many stacks maintaining separate
> thread information.
>
> Hope this helps!!
>
>
>
>
>
> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Asimansu
>>
>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>> subsets must be bridged to the JVM somehow so that the data can be exposed
>> to the functions running inside Flink, no?
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>> wrote:
>>
>>> Hello Alexis,
>>>
>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>> ends up on the off-heap in the memory.
>>>
>>> For more details, check the following link:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>
>>> I hope this addresses your inquiry.
>>>
>>>
>>>
>>>
>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
 Hello,

 Most info regarding RocksDB memory for Flink focuses on what's needed
 independently of the JVM (although the Flink process configures its limits
 and so on). I'm wondering if there are additional special considerations
 with regards to the JVM heap in the following scenario.

 Assuming a key used to partition a Flink stream and its state has a
 high cardinality, but that the state of each key is small, when Flink
 prepares the state to expose to a user function during a call (with a given
 partition key), I guess it loads only the required subset from RocksDB, but
 does this small subset end (temporarily) up on the JVM heap? And if it
 does, does it stay "cached" in the JVM for some time or is it immediately
 discarded after the user function completes?

 Maybe this isn't even under Flink's control, but I'm curious.

 Regards,
 Alexis.

>>>


Re: Preparing keyed state before snapshot

2024-02-17 Thread Zakelly Lan
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an
example in flink code (FastTop1Function#snapshotState) [1] of setting keys
when snapshotState().

Hope this helps.

[1]
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
wrote:

> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am missing something
>
> Thanks
> Lorenzo
>
>
> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Good morning Lorenzo,
>>
>>
>>
>> You may want to implement
>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>> your KeyedProcessFunction.
>>
>> Btw. By the time initializeState(…) is called, the state backend is fully
>> initialized and can be read and written to (which is not the case for when
>> the open(…) function is called.
>>
>> In initializeState(…) you also get access to state of different operator
>> key.
>>
>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>> store data.
>>
>>
>>
>> Sincere greetings
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Lorenzo Nicora 
>> *Sent:* Thursday, February 15, 2024 7:50 PM
>> *To:* Flink User Group 
>> *Subject:* Preparing keyed state before snapshot
>>
>>
>>
>> Hello everyone,
>>
>>
>>
>> I have a convoluted problem.
>>
>>
>>
>> I am implementing a KeyedProcessFunction that keeps some non-serializable
>> "state" in memory, in a transient Map (key = stream key, value = the
>> non-serializable "state").
>>
>>
>>
>> I can extract a serializable representation to put in Flink state, and I
>> can load my in-memory "state" from the Flink state. But these operations
>> are expensive.
>>
>>
>>
>> Initializing the in-memory "state" is relatively easy. I do it lazily, in
>> processElement(), on the first record for the key.
>>
>>
>>
>> The problem is saving the in-memory "state" to Flink state.
>>
>> I need to do it only before the state snapshot. But KeyedProcessFunction
>> has no entrypoint called before the state snapshot.
>>
>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>> work for keyed state.
>>
>>
>>
>> Any suggestions?
>>
>>
>>
>> Note that I cannot use operator state nor a broadcast state.
>>
>> Processing is keyed. Every processed record modifies the in-memory
>> "state" of that key. If the job rescale, the state of the key must follow
>> the partition.
>>
>>
>>
>>
>>
>> Regards
>>
>> Lorenzo
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
And I found some previous discussion, FYI:
1. https://issues.apache.org/jira/browse/FLINK-3035
2. https://www.mail-archive.com/dev@flink.apache.org/msg10666.html

Hope this helps.

Best,
Zakelly

On Tue, Jan 30, 2024 at 4:08 PM Zakelly Lan  wrote:

> Hi Chirag
>
> That's an interesting idea. IIUC, storing key-values can be simply
> implemented for Redis, but supporting checkpoint and recovery is relatively
> challenging. Flink's checkpoint should be consistent among all stateful
> operators at the same time. For an *embedded* and *file-based* key value
> store like RocksDB, it is easier to implement by uploading files of
> specific time asynchronously.
>
> Moreover if you want to store your state basically in memory, then why not
> using the HashMapStateBackend. It saves the overhead of serialization and
> deserialization and may achieve better performance compared with Redis I
> guess.
>
>
> Best,
> Zakelly
>
> On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> I was looking at the FLIP-254: Redis Streams Connector and I was
>> wondering if Flink ever considered Redis as a state backend? And if yes,
>> why was it discarded compared to RocksDB?
>>
>> If someone can point me towards any deep dives on why RocksDB is a better
>> fit as a state backend, it would be helpful.
>>
>> Thanks,
>> Chirag
>>
>


Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
Hi Chirag

That's an interesting idea. IIUC, storing key-values can be simply
implemented for Redis, but supporting checkpoint and recovery is relatively
challenging. Flink's checkpoint should be consistent among all stateful
operators at the same time. For an *embedded* and *file-based* key value
store like RocksDB, it is easier to implement by uploading files of
specific time asynchronously.

Moreover if you want to store your state basically in memory, then why not
using the HashMapStateBackend. It saves the overhead of serialization and
deserialization and may achieve better performance compared with Redis I
guess.


Best,
Zakelly

On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user 
wrote:

> Hi,
>
> I was looking at the FLIP-254: Redis Streams Connector and I was
> wondering if Flink ever considered Redis as a state backend? And if yes,
> why was it discarded compared to RocksDB?
>
> If someone can point me towards any deep dives on why RocksDB is a better
> fit as a state backend, it would be helpful.
>
> Thanks,
> Chirag
>


Re: Why calling ListBucket for each file in a checkpoint

2024-01-21 Thread Zakelly Lan
Are you accessing the s3 API with presto implementation? If so, you may
read the code of `com.facebook.presto.hive.s3.PrestoS3FileSystem#create`
and find it check the existence of the target path first, in which the
`getFileStatus` and `listPrefix` are called. There is no option for this.


Best,
Zakelly

On Fri, Jan 19, 2024 at 9:32 PM Evgeniy Lyutikov 
wrote:

> Hi all!
> I'm trying to understand the logic of saving checkpoint files and from the
> exchange dump with ceph I see the following requests
>
> HEAD
> /checkpoints/example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9
> HTTP/1.1
> HTTP/1.1 404 Not Found
> HEAD
> /checkpoints/example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9/
> HTTP/1.1
> HTTP/1.1 404 Not Found
> GET
> /checkpoints/?prefix=example-job%2F%2Fshared%2F9701fae2-0de3-4d6c-b08b-0a92fb7285c9%2F=%2F=1=url
> HTTP/1.1
> HTTP/XML 486HTTP/1.1 200 OK
> 
>  xmlns="http://s3.amazonaws.com/doc/2006-03-01/;>
> 
> 
> checkpoints
> 
> 
>
> example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9/
> 
> 
> 1
> 
> 
> /
> 
> 
> false
> 
> 
> 
> 
> PUT
> /checkpoints/example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9
> HTTP/1.1
>
> It’s not entirely clear why, after checking for the presence of an object
> and a directory by HEAD methods additionally call ListBucket get request.
> Can this be disabled using options or does the code require modification?
> We have our own Ceph cluster and frequent ListBucket requests heavily load
> the index pool.
> Flink 1.17.1, presto backend for checkpoint.
>
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.

2024-01-18 Thread Zakelly Lan
Glad to hear this!


Best,
Zakelly

On Fri, Jan 19, 2024 at 9:22 AM Konstantinos Karavitis 
wrote:

> I would like again to thank you as we managed to fix this strange issue we
> had by moving all the state initializations into the open method of
> ProcessFunction!
>
> On Thu, Jan 18, 2024 at 11:53 PM Konstantinos Karavitis <
> kkaravi...@gmail.com> wrote:
>
>> Thank you very much Zakelly for taking the time to answer to my question.
>> I appreciate it a lot.
>> Unfortunately, I cannot share the source code as it is confidential and
>> owned by the company that I co-operate with.
>> But, yes you are right that inside the code, I can see that the state
>> initialization happens inside the AbstractProcessFunction#processElement
>>  method.
>>
>>  Thank you very much,
>> Kostas
>>
>> On Thu, Jan 18, 2024 at 1:17 PM Zakelly Lan 
>> wrote:
>>
>>> Hi,
>>>
>>> Could you please share the code of state initialization (getting state
>>> from a state descriptor)? It seems you are creating a state in
>>> #processElement?
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you please share the code of state initialization (getting state
>>>> from a state descriptor)? It seems you are creating a state in
>>>> #processElement?
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis <
>>>> kkaravi...@gmail.com> wrote:
>>>>
>>>>> Have you ever met the following error when a flink application
>>>>> restarts and tries to restore the state from RocksDB?
>>>>>
>>>>>
>>>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has
>>>>> already been registered for the state; re-registration is not allowed.
>>>>> at
>>>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)*
>>>>>
>>>>> May that be a potential bug of a race condition where the namespace
>>>>> serializer is being registered by more than one place concurrently?
>>>>>
>>>>> Here's also the full stack trace
>>>>>
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:9

Re: Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 Thread Zakelly Lan
图挂了看不到,不然你把文字信息简单复制下来看看?
另外你的ProcessWindowFunction里是否会访问state,如果访问了,是否实现了clear方法?

On Thu, Jan 18, 2024 at 3:01 PM fufu  wrote:

> 看hdfs上shard文件比chk-xxx要大很多。
>
>
>
> 在 2024-01-18 14:49:14,"fufu"  写道:
>
> 是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink
> UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
>
> 在 2024-01-18 10:56:51,"Zakelly Lan"  写道:
>
> >你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
> >TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
> >
> >On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
> >
> >>
> >>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> >> https://blog.csdn.net/RL_LEEE/article/details/123864487
> ,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> >> 请社区指导下,或者有没有别的解决方案?感谢社区!
>


Re: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.

2024-01-17 Thread Zakelly Lan
Hi,

Could you please share the code of state initialization (getting state from
a state descriptor)? It seems you are creating a state in #processElement?


Best,
Zakelly

On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan  wrote:

> Hi,
>
> Could you please share the code of state initialization (getting state
> from a state descriptor)? It seems you are creating a state in
> #processElement?
>
>
> Best,
> Zakelly
>
> On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis <
> kkaravi...@gmail.com> wrote:
>
>> Have you ever met the following error when a flink application restarts
>> and tries to restore the state from RocksDB?
>>
>>
>> *Caused by: java.lang.UnsupportedOperationException: A serializer has
>> already been registered for the state; re-registration is not allowed.
>> at
>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)*
>>
>> May that be a potential bug of a race condition where the namespace
>> serializer is being registered by more than one place concurrently?
>>
>> Here's also the full stack trace
>>
>> at
>> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> at java.base/java.lang.Thread.run(Unknown Source)
>>
>> *Caused by: java.lang.UnsupportedOperationException: A serializer has
>> already been registered for the state; re-registration is not allowed.
>> at
>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)*
>> Caused by: java.lang.UnsupportedOperationException: A serializer has
>> already been registered for the state; re-registration is not allowed.
>> *at
>> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)*
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:734)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
>> at
>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>> at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>> at
>> org.apache.flink.runtime.state.AbstractKeye

Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 Thread Zakelly Lan
你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大

On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:

>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re: flink-checkpoint 问题

2024-01-11 Thread Zakelly Lan
748)
>
>
> JM日志,没有25548的触发记录:
> 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes
> in 50128 ms).
> 2023-12-31 18:40:10.681 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for job
> d12f3c6e836f56fb23d96e31737ff0b3.
> 2023-12-31 18:50:10.681 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 25547 of job d12f3c6e836f56fb23d96e31737ff0b3 expired before completing.
> 2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a
> global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>  at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> checkpoing路径下有:
> 25546:正常
> 25547:无
> 25548:有,路径下为空
>
>
>
>
> 任务人为从25548恢复时失败,抛出异常找不到_metadate文件
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>  回复的原邮件 ----
> | 发件人 | Xuyang |
> | 发送日期 | 2024年1月11日 14:55 |
> | 收件人 |  |
> | 主题 | Re:回复: flink-checkpoint 问题 |
> Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
> 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:
>
> JM中chk失败时间点日志,没有25548的触发记录:
>
>
> 自动recovery失败:
>
>
> TM日志:
>
>
> checkpoint文件路径,25548里面空的:
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>  回复的原邮件 
> | 发件人 | Zakelly Lan |
> | 发送日期 | 2024年1月10日 18:20 |
> | 收件人 |  |
> | 主题 | Re: flink-checkpoint 问题 |
> 你好,
> 方便的话贴一下jobmanager的log吧,应该有一些线索
>
>
> On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:
>
> Flink版本: 1.12
> checkpoint配置:hdfs
>
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>
>


Re: flink-checkpoint 问题

2024-01-10 Thread Zakelly Lan
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

> Flink版本: 1.12
> checkpoint配置:hdfs
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>


Re: keyby mapState use question

2023-12-10 Thread Zakelly Lan
Hi,

This should not happen. I guess the `onTimer` and `processElement` you are
testing are triggered under different keyby keys. Note that the keyed
states are partitioned by the keyby key first, so if querying or setting
the state, you are only manipulating the specific partition which does not
affect other partitions. Please check the partitioned keys (keyby key) are
the same under those two functions using `ctx.getCurrentKey()`. Hope
this helps.


Best,
Zakelly

On Thu, Dec 7, 2023 at 4:48 PM Jake.zhang  wrote:

> Hi all:
>
> KeyBy process function
>
> EventKeyedBroadcastProcessFunction {
>
> private transient mapstate = null;
>
> public void open(Configuration parameters) throws Exception {
>  // initial map state
> }
>
> public void processElement() {
>  // can't get onTimer() function set state key value
>}
>
>public void onTimer() {
>   // set map state  key value first
>}
>
> }
>
> why processElement function can't get onTimer function sets  value ?
>
> thanks.
>
>


Re: Unsubscribe from user list.

2023-10-18 Thread Zakelly Lan
Hi Lijuan,

Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org.


Best,
Zakelly

On Thu, Oct 19, 2023 at 6:23 AM Hou, Lijuan via user
 wrote:
>
> Hi team,
>
>
>
> Could you please remove this email from the subscription list? I have another 
> email (juliehou...@gmail.com) subscribed as well. I can use that email to 
> receive flink emails.
>
>
>
> Thank you!
>
>
>
> Best,
>
> Lijuan


Re: Problems with the state.backend.fs.memory-threshold parameter

2023-10-13 Thread Zakelly Lan
Hi rui,

The 'state.backend.fs.memory-threshold' configures the threshold below
which state is stored as part of the metadata, rather than in separate
files. So as a result the JM will use its memory to merge small
checkpoint files and write them into one file. Currently the
FLIP-306[1][2] is proposed to merge small checkpoint files without
consuming JM memory. This feature is currently being worked on and is
targeted for the next minor release (1.19).


Best,
Zakelly

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
[2] https://issues.apache.org/jira/browse/FLINK-32070

On Fri, Oct 13, 2023 at 6:28 PM rui chen  wrote:
>
> We found that for some tasks, the JM memory continued to increase. I set
> the parameter of state.backend.fs.memory-threshold to 0, and the JM memory
> would no longer increase, but many small files might be written in this
> way. Does the community have any optimization plan for this area?


Re: Problems with the state.backend.fs.memory-threshold parameter

2023-10-13 Thread Zakelly Lan
Hi rui,

The 'state.backend.fs.memory-threshold' configures the threshold below
which state is stored as part of the metadata, rather than in separate
files. So as a result the JM will use its memory to merge small
checkpoint files and write them into one file. Currently the
FLIP-306[1][2] is proposed to merge small checkpoint files without
consuming JM memory. This feature is currently being worked on and is
targeted for the next minor release (1.19).


Best,
Zakelly

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
[2] https://issues.apache.org/jira/browse/FLINK-32070

On Fri, Oct 13, 2023 at 6:28 PM rui chen  wrote:
>
> We found that for some tasks, the JM memory continued to increase. I set
> the parameter of state.backend.fs.memory-threshold to 0, and the JM memory
> would no longer increase, but many small files might be written in this
> way. Does the community have any optimization plan for this area?


Re: updating keyed state in open method.

2023-09-07 Thread Zakelly Lan
Hi,

You cannot access the keyed state within #open(). It can only be
accessed under a keyed context ( a key is selected while processing an
element, e.g. #processElement).

Best,
Zakelly

On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski
 wrote:
>
> Hi,
> I'm having a problem with my toy flink job where I would like to access a 
> ValueState of a keyed stream. The Job setup can be found here [1], it is 
> fairly simple
>
> env
> .addSource(new CheckpointCountingSource(100, 60))
> .keyBy(value -> value)
> .process(new KeyCounter())
> .addSink(new ConsoleSink());
>
> As you can see I'm using a keyBay and KeyCounter is extending 
> KeyedProcessFunction.
> It seems that keyed state cannot be update from RichFunction::open() method. 
> Is that intended?
>
> When I ran this example I have an exception that says:
>
> Caused by: java.lang.NullPointerException: No key set. This method should not 
> be called outside of a keyed context.
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> at 
> org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
> at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
> at org.example.KeyCounter.open(KeyCounter.java:26)
>
>
> [1] 
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java


Re: Broadcast state and job restarts

2022-10-28 Thread Zakelly Lan
Hi Alexis,

Broadcast state is one type of the Operator State, which is included
in savepoints and checkpoints and won't be lost.
Please refer to
https://stackoverflow.com/questions/62509773/flink-broadcast-state-rocksdb-state-backend/62510423#62510423

Best,
Zakelly

On Fri, Oct 28, 2022 at 4:41 AM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
> The documentation for broadcast state specifies that it is always kept in 
> memory. My assumptions based on this statement are:
>
> 1. If a job restarts in the same Flink cluster (i.e. using a restart 
> strategy), the tasks' attempt number increases and the broadcast state is 
> restored since it's not lost from memory.
> 2. If the whole Flink cluster is restarted with a savepoint, broadcast state 
> will not be restored and I need to write my application with this in mind.
>
> Are these correct?
>
> Regards,
> Alexis.
>


Re: Limiting backpressure during checkpoints

2022-10-25 Thread Zakelly Lan
Hi Robin,

You said that during the checkpoint async phase the CPU is stable at
100%, which is pretty strange to me. Normally the cpu usage of the
taskmanager process could exceed 100%, depending on what all the
threads are doing. I'm wondering if there is any scheduling mechanism
controlling the CPU usage of a process in your setup, such as
leveraging CGroup in yarn or Kubernetes. In this case, the uploading
thread may preempt cpu resources from the task processing thread.
The second thing that might help is, you may check the io utilization
during the checkpoint. The uploading thread keeps reading from the
local disk and writing to the remote, which may affect the io and
state access latency, especially when the state size is large.

Best,
Zakelly

On Tue, Oct 25, 2022 at 12:10 AM Robin Cassan via user
 wrote:
>
> Hello Yuan Mei! Thanks a lot for your answer :)
>
> About the CPU usage, it is pretty stable at 80% normally. Every 15 minutes we 
> trigger a checkpoint, and during this time it is stable at 100%
> I am starting to wonder if CPU is the real limiting factor, because when 
> checking the Flink UI I see that most of the checkpoint duration is async. I 
> do not know how the async phase affects backpressure, but it does look like 
> the upload to S3 phase is causing the backpressure. The sync phase is quite 
> short as well.
> Looking at this article 
> https://flink.apache.org/2022/05/23/latency-part2.html it seems we already 
> are in the most efficient configuration (at-least-once, non-concurrent 
> checkpointing, rocksdb on local NVME SSDs...), I don't see an obvious 
> quick-win apart from scaling up the full cluster.
>
> Reducing the state size will be a big challenge but even then it would not 
> guarantee consistent latency, same for less frequent checkpoints.
> For now it looks like our only option to achieve real-time computation would 
> be to not use Flink (or at least, not include these computations inside a job 
> with a big state that is checkpointed). Thanks again for the insight, and if 
> you happen to have any information on how we could prevent the async phase of 
> checkpoints to add backpressure on our stream I would be very interested!
>
> Le mer. 19 oct. 2022 à 10:57, Yuan Mei  a écrit :
>>
>> Hey Robin,
>>
>> Thanks for sharing the detailed information.  May I ask, when you are saying 
>> "CPU usage is around 80% when checkpoints aren't running, and capped at 100% 
>> when they are", do you see zigzag patterns of CPU usage, or is it kept 
>> capped at 100% of CPU?
>>
>> I think one possibility is that the sync phase of cp (the writebuffer flush 
>> during the sync phase) triggers a rocksdb compaction, and we saw this 
>> happens on Ververica services as well.
>>
>> At this moment, maybe you can try to make the checkpoint less frequent 
>> (increase the checkpoint interval) to reduce the frequency of compaction. 
>> Please let me know whether this helps.
>>
>> In long term, I think we probably need to separate the compaction process 
>> from the internal db and control/schedule the compaction process ourselves 
>> (compaction takes a good amount of CPU and reduces TPS).
>>
>> Best.
>> Yuan
>>
>>
>>
>> On Thu, Oct 13, 2022 at 11:39 PM Robin Cassan via user 
>>  wrote:
>>>
>>> Hello all, hope you're well :)
>>> We are attempting to build a Flink job with minimal and stable latency (as 
>>> much as possible) that consumes data from Kafka. Currently our main 
>>> limitation happens when our job checkpoints the RocksDB state: backpressure 
>>> is applied on the stream, causing latency. I am wondering if there are ways 
>>> to configure Flink so that the checkpointing process affects the flow of 
>>> data as little as possible?
>>>
>>> In our case, backpressure seems to arise from CPU consumption, because:
>>> - CPU usage is around 80% when checkpoints aren't running, and capped at 
>>> 100% when they are
>>> - checkpoint alignment time is very low, using unaligned checkpoints 
>>> doesn't appear to help with backpressure
>>> - network (async) part of the checkpoint should in theory not cause 
>>> backpressure since resources would be used for the main stream during async 
>>> waits, but I might be wrong
>>>
>>> What we would really like to achieve is isolating the compute resource used 
>>> for checkpointing from the ones used for task slots. Which would of course 
>>> mean that we need to oversize our cluster for having resources available 
>>> for checkpointing even when it's not running, but also that we would get 
>>> longer checkpoints compared to today where checkpoints seem to use CPU 
>>> cores attributed to task slots. We are ok with that to some degree, but we 
>>> don't know how to achieve this isolation. Do you have any clue?
>>>
>>> Lastly, we currently have nodes with 8 cores but allocate 6 task slots, and 
>>> we have set the following settings:
>>>
>>> state.backend.rocksdb.thread.num: 6
>>> state.backend.rocksdb.writebuffer.count: 6
>>>
>>>
>>> Thanks all for 

Re: Flink RocksDB Performance

2021-07-16 Thread Zakelly Lan
Hi Li Jim,
Filesystem performs much better than rocksdb (by multiple times), but it is
only suitable for small states. Rocksdb will consume more CPU on background
tasks, cache management, serialization/deserialization and
compression/decompression. In most cases, performance of the Rocksdb will
meet the need.
For tuning, please check
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb
Moreover, you could leverage some tools such as the async-profiler(
https://github.com/jvm-profiling-tools/async-profiler) to figure out which
part consumes the most CPU.

On Fri, Jul 16, 2021 at 3:19 PM Li Jim  wrote:

> Hello everyone,
> I am using Flink 1.13.1 CEP Library and doing some pressure test.
> My message rate is about 16000 records per second.
> I find that it cant process more than 16000 records per second because the
> CPU cost is up to 100%(say 800% because I allocated 8 vcores to a
> taskmanager).
> I tried switch to filesystem mode, it gtt faster and cpu cost goes low.
> I understand this may because of serialization/deserialization cost in
> rocksdb, but in some reason we must use rocksdb as state backend.
> Any suggestion to optimize this issue?
>
>
>
>
>