[jira] [Created] (FLINK-35784) The cp file-merging directory not properly registered in SharedStateRegistry

2024-07-08 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35784:
---

 Summary: The cp file-merging directory not properly registered in 
SharedStateRegistry
 Key: FLINK-35784
 URL: https://issues.apache.org/jira/browse/FLINK-35784
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.20.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan


The {{OperatorSubtaskState}} only make keyed state to register with 
{{SharedStateRegistry}}. However, the file-merging directories's handle are 
wrapped in {{FileMergingOperatorStreamStateHandle}}, which is an 
{{OperatorStreamStateHandle}}. That means the {{#registerSharedStates}} is 
never called, so the registry will never delete the directories.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35778) Escape URI reserved characters when creating file-merging directories

2024-07-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35778:
---

 Summary: Escape URI reserved characters when creating file-merging 
directories
 Key: FLINK-35778
 URL: https://issues.apache.org/jira/browse/FLINK-35778
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.20.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.20.0


Currently, the file-merging manager for checkpoint files will create 
directories based on tm resource id, job id and operator ids. While in some 
cases, these ids include some characters that are reserved in URI scheme. So we 
should do a simple escape for those ids.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Release flink-shaded 19.0

2024-06-27 Thread Zakelly Lan
Hi Sergey,

FYI we also meet the regression of stringRead or stringWrite when upgrading
the JMH[1]. I guess it is caused by JIT. I'd suggest accepting it if JIT is
so sensitive to 'unrelated' change in codebase, since this probably won't
be a problem in production environment.

I recommend we do not overemphasize these results. How about going ahead
with the upgrade first and testing the performance, then we can discuss if
there is indeed a concern.


[1] https://issues.apache.org/jira/browse/FLINK-35629

Best,
Zakelly

On Thu, Jun 27, 2024 at 6:21 PM Sergey Nuyanzin  wrote:

> Thanks for starting this discussion Dawid
>
> In general I'm +1 for the release
>
> IIRC there is one blocker for flink-shaded[1] which was the reason of
> downgrading it from 18 to 17[2]
>
> I would suggest to resolve it first
>
> [1] https://issues.apache.org/jira/browse/FLINK-34234
> [2] https://issues.apache.org/jira/browse/FLINK-34148
>
> On Thu, Jun 27, 2024 at 11:42 AM Dawid Wysakowicz
>  wrote:
> >
> > Hi,
> > I'd like to propose releasing flink-shaded 19.0. I suggest it has a very
> > limited scope and include just shading of
> com.jayway.jsonpath:json-path[1]
> >
> > This would let us fix FLINK-35696[2]
> >
> > If there are no objections, I'd prepare a release.
> >
> > [1]
> >
> https://github.com/apache/flink-shaded/commit/b23e1a811fcacbc5f53993297304131246bb5d04
> > [2] https://issues.apache.org/jira/browse/FLINK-35696
> >
> > Best,
> > Dawid
>
>
>
> --
> Best regards,
> Sergey
>


Re: [2.0] How to handle on-going feature development in Flink 2.0?

2024-06-26 Thread Zakelly Lan
+1 for a preview before the formal release. It would help us find issues in
advance.


Best,
Zakelly

On Wed, Jun 26, 2024 at 4:44 PM Jingsong Li  wrote:

> +1 to release a preview version.
>
> Best,
> Jingsong
>
> On Wed, Jun 26, 2024 at 10:12 AM Jark Wu  wrote:
> >
> > I also think this should not block new feature development.
> > Having "nice-to-have" and "must-to-have" tags on the FLIPs is a good
> idea.
> >
> > For the downstream projects, I think we need to release a 2.0 preview
> > version one or
> > two months before the formal release. This can leave some time for the
> > downstream
> > projects to integrate and provide feedback. So we can fix the problems
> > (e.g. unexpected
> > breaking changes, Java versions) before 2.0.
> >
> > Best,
> > Jark
> >
> > On Wed, 26 Jun 2024 at 09:39, Xintong Song 
> wrote:
> >
> > > I also don't think we should block new feature development until 2.0.
> From
> > > my understanding, the new major release is no different from the
> regular
> > > minor releases for new features.
> > >
> > > I think tracking new features, either as nice-to-have items or in a
> > > separate list, is necessary. It helps us understand what's going on in
> the
> > > release cycle, and what to announce and promote. Maybe we should start
> a
> > > discussion on updating the 2.0 item list, to 1) collect new items that
> are
> > > proposed / initiated after the original list being created and 2) to
> remove
> > > some items that are no longer suitable. I'll discuss this with the
> other
> > > release managers first.
> > >
> > > For the connectors and operators, I think it depends on whether they
> depend
> > > on any deprecated APIs or internal implementations of Flink. Ideally,
> > > all @Public APIs and @PublicEvolving APIs that we plan to change /
> remove
> > > should have been deprecated in 1.19 and 1.20 respectively. That means
> if
> > > the connectors and operators only use non-deprecated @Puclib
> > > and @PublicEvolving APIs in 1.20, hopefully there should not be any
> > > problems upgrading to 2.0.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Wed, Jun 26, 2024 at 5:20 AM Becket Qin 
> wrote:
> > >
> > > > Thanks for the question, Matthias.
> > > >
> > > > My two cents, I don't think we are blocking new feature development.
> My
> > > > understanding is that the community will just prioritize removing
> > > > deprecated APIs in the 2.0 dev cycle. Because of that, it is possible
> > > that
> > > > some new feature development may slow down a little bit since some
> > > > contributors may be working on the must-have features for 2.0. But
> policy
> > > > wise, I don't see a reason to block the new feature development for
> the
> > > 2.0
> > > > release feature plan[1].
> > > >
> > > > Process wise, I like your idea of adding the new features as
> nice-to-have
> > > > in the 2.0 feature list.
> > > >
> > > > Re: David,
> > > > Given it is a major version bump. It is possible that some of the
> > > > downstream projects (e.g. connectors, Paimon, etc) will have to see
> if a
> > > > major version bump is also needed there. And it is probably going to
> be
> > > > decisions made on a per-project basis.
> > > > Regarding the Java version specifically, this probably worth a
> separate
> > > > discussion. According to a recent report[2] on the state of Java, it
> > > might
> > > > be a little early to drop support for Java 11. We can discuss this
> > > > separately.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > [1] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release
> > > > [2]
> > > >
> > > >
> > >
> https://newrelic.com/sites/default/files/2024-04/new-relic-state-of-the-java-ecosystem-report-2024-04-30.pdf
> > > >
> > > > On Tue, Jun 25, 2024 at 4:58 AM David Radley <
> david_rad...@uk.ibm.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I think this is a great question. I am not sure if this has been
> > > covered
> > > > > elsewhere, but it would be good to be clear how this effects the
> > > > connectors
> > > > > and operator repos, with potentially v1 and v2 oriented new
> featuresI
> > > > > suspect this will be a connector by connector investigation. I am
> > > > thinking
> > > > > connectors with Hadoop eco-system dependencies (e.g. Paimon) which
> may
> > > > not
> > > > > work nicely with Java 17,
> > > > >
> > > > >  Kind regards, David.
> > > > >
> > > > >
> > > > > From: Matthias Pohl 
> > > > > Date: Tuesday, 25 June 2024 at 09:57
> > > > > To: dev@flink.apache.org 
> > > > > Cc: Xintong Song , martijnvis...@apache.org
> <
> > > > > martijnvis...@apache.org>, imj...@gmail.com ,
> > > > > becket@gmail.com 
> > > > > Subject: [EXTERNAL] [2.0] How to handle on-going feature
> development in
> > > > > Flink 2.0?
> > > > > Hi 2.0 release managers,
> > > > > With the 1.20 release branch being cut [1], master is now
> referring to
> > > > > 2.0-SNAPSHOT. I remember that, initially, the community had 

Re: [VOTE] FLIP-444: Native file copy support

2024-06-26 Thread Zakelly Lan
+1 (binding)


Best,
Zakelly

On Wed, Jun 26, 2024 at 3:54 PM Yuepeng Pan  wrote:

> +1 (non-binding)
>
>
>
>
> Best regards,
>
> Yuepeng Pan
>
>
>
>
>
>
>
>
>
>
> At 2024-06-26 15:27:17, "Piotr Nowojski"  wrote:
> >Thanks for pointing this out Zakelly. After the discussion on the dev
> >mailing list, I have updated the `PathsCopyingFileSystem` to merge its
> >functionalities with `DuplicatingFileSystem`, but I've just forgotten to
> >mention that it will removed/replaced with `PathsCopyingFileSystem`.
> >
> >Vote can be resumed.
> >
> >Best,
> >Piotrek
> >
> >wt., 25 cze 2024 o 18:57 Piotr Nowojski 
> napisał(a):
> >
> >> Ops, I must have forgotten to update the FLIP as we discussed. I will
> fix
> >> it tomorrow and the vote period will be extended.
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 25 cze 2024 o 13:56 Zakelly Lan 
> napisał(a):
> >>
> >>> Hi Piotrek,
> >>>
> >>> I don't see any statement about removing or renaming the
> >>> `DuplicatingFileSystem` in the FLIP, shall we do that as mentioned in
> the
> >>> discussion thread?
> >>>
> >>>
> >>> Best,
> >>> Zakelly
> >>>
> >>> On Tue, Jun 25, 2024 at 4:58 PM Piotr Nowojski 
> >>> wrote:
> >>>
> >>> > Hi all,
> >>> >
> >>> > I would like to start a vote for the FLIP-444 [1]. The discussion
> >>> thread is
> >>> > here [2].
> >>> >
> >>> > The vote will be open for at least 72.
> >>> >
> >>> > Best,
> >>> > Piotrek
> >>> >
> >>> > [1] https://cwiki.apache.org/confluence/x/rAn9EQ
> >>> > [2] https://lists.apache.org/thread/lkwmyjt2bnmvgx4qpp82rldwmtd4516c
> >>> >
> >>>
> >>
>


Re: [VOTE] FLIP-444: Native file copy support

2024-06-25 Thread Zakelly Lan
Hi Piotrek,

I don't see any statement about removing or renaming the
`DuplicatingFileSystem` in the FLIP, shall we do that as mentioned in the
discussion thread?


Best,
Zakelly

On Tue, Jun 25, 2024 at 4:58 PM Piotr Nowojski  wrote:

> Hi all,
>
> I would like to start a vote for the FLIP-444 [1]. The discussion thread is
> here [2].
>
> The vote will be open for at least 72.
>
> Best,
> Piotrek
>
> [1] https://cwiki.apache.org/confluence/x/rAn9EQ
> [2] https://lists.apache.org/thread/lkwmyjt2bnmvgx4qpp82rldwmtd4516c
>


[jira] [Created] (FLINK-35667) Implement Reducing Async State API for ForStStateBackend

2024-06-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35667:
---

 Summary: Implement Reducing Async State API for ForStStateBackend
 Key: FLINK-35667
 URL: https://issues.apache.org/jira/browse/FLINK-35667
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35666) Implement Aggregating Async State API for ForStStateBackend

2024-06-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35666:
---

 Summary: Implement Aggregating Async State API for 
ForStStateBackend
 Key: FLINK-35666
 URL: https://issues.apache.org/jira/browse/FLINK-35666
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing for the AdaptiveScheduler

2024-06-17 Thread Zakelly Lan
+1 (binding)


Best,
Zakelly

On Mon, Jun 17, 2024 at 5:10 PM Rui Fan <1996fan...@gmail.com> wrote:

> +1 (binding)
>
> Best,
> Rui
>
> On Mon, Jun 17, 2024 at 4:58 PM David Morávek 
> wrote:
>
> > +1 (binding)
> >
> > On Mon, Jun 17, 2024 at 10:24 AM Matthias Pohl 
> wrote:
> >
> > > Hi everyone,
> > > the discussion in [1] about FLIP-461 [2] is kind of concluded. I am
> > > starting a vote on this one here.
> > >
> > > The vote will be open for at least 72 hours (i.e. until June 20, 2024;
> > > 8:30am UTC) unless there are any objections. The FLIP will be
> considered
> > > accepted if 3 binding votes (from active committers according to the
> > Flink
> > > bylaws [3]) are gathered by the community.
> > >
> > > Best,
> > > Matthias
> > >
> > > [1] https://lists.apache.org/thread/nnkonmsv8xlk0go2sgtwnphkhrr5oc3y
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Hang Ruan

2024-06-16 Thread Zakelly Lan
Congratulations, Hang!


Best,
Zakelly

On Mon, Jun 17, 2024 at 12:07 PM Yanquan Lv  wrote:

> Congratulations, Hang!
>
> Samrat Deb  于2024年6月17日周一 11:32写道:
>
> > Congratulations Hang Ruan !
> >
> > Bests,
> > Samrat
> >
> > On Mon, Jun 17, 2024 at 8:47 AM Leonard Xu  wrote:
> >
> > > Hi everyone,
> > > On behalf of the PMC, I'm happy to let you know that Hang Ruan has
> become
> > > a new Flink Committer !
> > >
> > > Hang Ruan has been continuously contributing to the Flink project since
> > > August 2021. Since then, he has continuously contributed to Flink,
> Flink
> > > CDC, and various Flink connector repositories, including
> > > flink-connector-kafka, flink-connector-elasticsearch,
> > flink-connector-aws,
> > > flink-connector-rabbitmq, flink-connector-pulsar, and
> > > flink-connector-mongodb. Hang Ruan focuses on the improvements related
> to
> > > connectors and catalogs and initiated FLIP-274. He is most recognized
> as
> > a
> > > core contributor and maintainer for the Flink CDC project, contributing
> > > many features such as MySQL CDC newly table addition and the Schema
> > > Evolution feature.
> > >
> > > Beyond his technical contributions, Hang Ruan is an active member of
> the
> > > Flink community. He regularly engages in discussions on the Flink dev
> > > mailing list and the user-zh and user mailing lists, participates in
> FLIP
> > > discussions, assists with user Q, and consistently volunteers for
> > release
> > > verifications.
> > >
> > > Please join me in congratulating Hang Ruan for becoming an Apache Flink
> > > committer!
> > >
> > > Best,
> > > Leonard (on behalf of the Flink PMC)
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Zhongqiang Gong

2024-06-16 Thread Zakelly Lan
Congratulations, Zhongqiang!


Best,
Zakelly

On Mon, Jun 17, 2024 at 12:05 PM Shawn Huang  wrote:

> Congratulations !
>
> Best,
> Shawn Huang
>
>
> Yuepeng Pan  于2024年6月17日周一 12:03写道:
>
> > Congratulations ! Best regards Yuepeng Pan
> >
> >
> >
> >
> >
> > At 2024-06-17 11:20:30, "Leonard Xu"  wrote:
> > >Hi everyone,
> > >On behalf of the PMC, I'm happy to announce that Zhongqiang Gong has
> > become a new Flink Committer!
> > >
> > >Zhongqiang has been an active Flink community member since November
> 2021,
> > contributing numerous PRs to both the Flink and Flink CDC repositories.
> As
> > a core contributor to Flink CDC, he developed the Oracle and SQL Server
> CDC
> > Connectors and managed essential website and CI migrations during the
> > donation of Flink CDC to Apache Flink.
> > >
> > >Beyond his technical contributions, Zhongqiang actively participates in
> > discussions on the Flink dev mailing list and responds to threads on the
> > user and user-zh mailing lists. As an Apache StreamPark (incubating)
> > Committer, he promotes Flink SQL and Flink CDC technologies at meetups
> and
> > within the StreamPark community.
> > >
> > >Please join me in congratulating Zhongqiang Gong for becoming an Apache
> > Flink committer!
> > >
> > >Best,
> > >Leonard (on behalf of the Flink PMC)
> >
>


[jira] [Created] (FLINK-35624) Release Testing: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-06-16 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35624:
---

 Summary: Release Testing: Verify FLIP-306 Unified File Merging 
Mechanism for Checkpoints
 Key: FLINK-35624
 URL: https://issues.apache.org/jira/browse/FLINK-35624
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
 Fix For: 1.20.0


Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070

 

1.20 is the MVP version for FLIP-306. It is a little bit complex and should be 
tested carefully. The main idea of FLIP-306 is to merge checkpoint files in TM 
side, and provide new {{{}StateHandle{}}}s to the JM. There will be a 
TM-managed directory under the 'shared' checkpoint directory for each subtask, 
and a TM-managed directory under the 'taskowned' checkpoint directory for each 
Task Manager. Under those new introduced directories, the checkpoint files will 
be merged into smaller file set. The following scenarios need to be tested, 
including but not limited to:
 # With the file merging enabled, periodic checkpoints perform properly, and 
the failover, restore and rescale would also work well.
 # Switch the file merging on and off across jobs, checkpoints and recovery 
also work properly.
 # There will be no left-over TM-managed directory, especially when there is no 
cp complete before the job cancellation.
 # File merging takes no effect in (native) savepoints.

Besides the behaviors above, it is better to validate the function of space 
amplification control and metrics. All the config options can be found under 
'execution.checkpointing.file-merging'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Flink 1.20 feature freeze

2024-06-15 Thread Zakelly Lan
Hi Robert, Rui, Ufuk and Weijie,

Thanks for the update!

FYI: This PR[1] fixes & cleanup the left-over checkpoint directories for
file-merging on TM exit. And the second commit fixes the wrong state handle
usage. We encountered several unexpected CI fails, so we missed the feature
freeze time. It is better to have this PR in 1.20 so I will merge this if
you agree. Thanks.


[1] https://github.com/apache/flink/pull/24933

Best,
Zakelly

On Sat, Jun 15, 2024 at 6:00 AM weijie guo 
wrote:

> Hi everyone,
>
>
> The feature freeze of 1.20 has started now. That means that no new features
>
> or improvements should now be merged into the master branch unless you ask
>
> the release managers first, which has already been done for PRs, or pending
>
> on CI to pass. Bug fixes and documentation PRs can still be merged.
>
>
>
> - *Cutting release branch*
>
>
> Currently we have no blocker issues(beside tickets that used for
> release-testing).
>
> We are planning to cut the release branch on next Friday (June 21) if
> no new test instabilities, and we'll make another announcement in the
> dev mailing list then.
>
>
>
> - *Cross-team testing*
>
>
> The release testing will start right after we cut the release branch, which
>
> is expected to come in the next week. As a prerequisite of it, we have
> created
>
> the corresponding instruction ticket in FLINK-35602 [1], please check
> and complete the
>
> documentation and test instruction of your new feature and mark the
> related JIRA
>
> issue in the 1.20 release wiki page [2] before we start testing, which
>
> would be quite helpful for other developers to validate your features.
>
>
>
> Best regards,
>
> Robert, Rui, Ufuk and Weijie
>
>
> [1]https://issues.apache.org/jira/browse/FLINK-35602
>
> [2] https://cwiki.apache.org/confluence/display/FLINK/1.20+Release
>


[jira] [Created] (FLINK-35617) Support object reuse in async state execution

2024-06-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35617:
---

 Summary: Support object reuse in async state execution
 Key: FLINK-35617
 URL: https://issues.apache.org/jira/browse/FLINK-35617
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Zakelly Lan
Assignee: Zakelly Lan


The record processor of {{AEC}} in async state execution model should consider 
object reuse and copy if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35590) Cleanup deprecated options usage in docs about state and checkpoint

2024-06-13 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35590:
---

 Summary: Cleanup deprecated options usage in docs about state and 
checkpoint 
 Key: FLINK-35590
 URL: https://issues.apache.org/jira/browse/FLINK-35590
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.20.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan


Currently, there is remaining usage of deprecated options in docs, such as 
'state.backend', which should be replaced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-464: Merge "flink run" and "flink run-application"

2024-06-12 Thread Zakelly Lan
+1 (binding)

Thanks for driving this!


Best,
Zakelly

On Thu, Jun 13, 2024 at 10:05 AM Junrui Lee  wrote:

> +1 (non-binding)
>
> Best,
> Junrui
>
> Biao Geng  于2024年6月13日周四 09:54写道:
>
> > Thanks for driving this.
> > +1 (non-binding)
> >
> > Best,
> > Biao Geng
> >
> >
> > weijie guo  于2024年6月13日周四 09:48写道:
> >
> > > Thanks for driving this!
> > >
> > > +1(binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Xintong Song  于2024年6月13日周四 09:04写道:
> > >
> > > > +1(binding)
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Thu, Jun 13, 2024 at 5:15 AM Márton Balassi <
> > balassi.mar...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Wed, Jun 12, 2024 at 7:25 PM Őrhidi Mátyás <
> > matyas.orh...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Sounds reasonable,
> > > > > > +1
> > > > > >
> > > > > > Cheers,
> > > > > > Matyas
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 12, 2024 at 8:54 AM Mate Czagany  >
> > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Thank you for driving this Ferenc,
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mate
> > > > > > >
> > > > > > > Ferenc Csaky  ezt írta (időpont:
> > 2024.
> > > > > jún.
> > > > > > > 12., Sze, 17:23):
> > > > > > >
> > > > > > > > Hello devs,
> > > > > > > >
> > > > > > > > I would like to start a vote about FLIP-464 [1]. The FLIP is
> > > about
> > > > to
> > > > > > > > merge back the
> > > > > > > > "flink run-application" functionality to "flink run", so the
> > > latter
> > > > > > will
> > > > > > > > be capable to deploy jobs in
> > > > > > > > all deployment modes. More details in the FLIP. Discussion
> > thread
> > > > > [2].
> > > > > > > >
> > > > > > > > The vote will be open for at least 72 hours (until 2024 March
> > 23
> > > > > 14:03
> > > > > > > > UTC) unless there
> > > > > > > > are any objections or insufficient votes.
> > > > > > > >
> > > > > > > > Thanks,Ferenc
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311626179
> > > > > > > > [2]
> > > > https://lists.apache.org/thread/xh58xs0y58kqjmfvd4yor79rv6dlcg5g
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging

2024-06-11 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35570:
---

 Summary: Consider PlaceholderStreamStateHandle in checkpoint file 
merging
 Key: FLINK-35570
 URL: https://issues.apache.org/jira/browse/FLINK-35570
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Zakelly Lan


In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} 
into account during lifecycle, since it can be a file merged one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] The performance of serializerHeavyString starts regress since April 3

2024-06-11 Thread Zakelly Lan
Thanks Sam for your investigation.

I revisited the logs and confirmed that the JDK has never changed.

'java -version' get:

> openjdk version "11.0.19" 2023-04-18 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.19.0.7-2) (build 11.0.19+7-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.19.0.7-2) (build 11.0.19+7-LTS,
> mixed mode, sharing)


It is installed via yum, 'yum info' get:

> Name : java-11-openjdk
> Epoch: 1
> Version  : 11.0.19.0.7
> Release  : 4.0.3.al8
> Architecture : x86_64
> Size : 1.3 M
> Source   : java-11-openjdk-11.0.19.0.7-4.0.3.al8.src.rpm
> Repository   : @System
> From repo: alinux3-updates
> Summary  : OpenJDK 11 Runtime Environment
> URL  : http://openjdk.java.net/
> License  : ASL 1.1 and ASL 2.0 and BSD and BSD with advertising and
> GPL+ and
>  : GPLv2 and GPLv2 with exceptions and IJG and LGPLv2+ and MIT
> and
>  : MPLv2.0 and Public Domain and W3C and zlib and ISC and FTL
> and
>  : RSA

Description  : The OpenJDK 11 runtime environment.


The benchmark env is hosted on Aliyun, the OS and JVM are also released by
Aliyun.


And thanks for your PR, I will try to put it in our daily run soon.


Best,
Zakelly

On Mon, Jun 10, 2024 at 9:24 AM Sam Barker  wrote:

> After completing the side quest
> [1] of enabling async
> profiler when running the JMH benchmarks I've been unable to reproduce the
> performance change between the last known good run and the first run
> highlighted as a regression.
> Results from my fedora f40 workstation using
>
> # JMH version: 1.37
> # VM version: JDK 11.0.23, OpenJDK 64-Bit Server VM, 11.0.23+9
> # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java
> # VM options: -Djava.rmi.server.hostname=127.0.0.1
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
> # Blackhole mode: full + dont-inline hint (auto-detected, use
> -Djmh.blackhole.autoDetect=false to disable)
>
> ───┬
>│ File: /tmp/profile-results/163b9cca6d2/jmh-result.csv
>
> ───┼
>1   │ "Benchmark","Mode","Threads","Samples","Score","Score Error
> (99.9%)","Unit"
>2   │
>
> "org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString","thrpt",1,30,179.453066,5.725733,"ops/ms"
>3   │
>
> "org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString:async","thrpt",1,1,NaN,NaN,"---"
>
> ───┴
>
> ───┬
>│ File: /tmp/profile-results/f38d8ca43f6/jmh-result.csv
>
> ───┼
>1   │ "Benchmark","Mode","Threads","Samples","Score","Score Error
> (99.9%)","Unit"
>2   │
>
> "org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString","thrpt",1,30,178.861842,6.711582,"ops/ms"
>3   │
>
> "org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString:async","thrpt",1,1,NaN,NaN,"---"
>
> ───┴──
>
> Where f38d8ca43f6 is the last known good run and 163b9cca6d2 is the first
> regression.
>
> One question I have from comparing my local results to those on flink-speed
> <
> https://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200
> >[2]
> is it possible the JDK version changed between the runs (I don't see the
> actual JDK build listed anywhere so I can't check versions or
> distributions)?
>
> I've also tried comparing building flink with the java11-target profile vs
> the default JDK 8 build and that does not change the performance.
>
> Sam
>
> [1] https://github.com/apache/flink-benchmarks/pull/90
> [2]
>
> https://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200
>
> On Wed, 29 May 2024 at 16:53, Sam Barker  wrote:
>
> > > I guess that improvement is a fluctuation. You can double check the
> > performance results[1] of the last few days. The performance isn't
> > recovered.
> >
> > Hmm yeah the improvement was a fluctuation and smaller than I remembered
> > seeing (maybe I had zoomed into the timeline 

Re: [DISCUSS] FLIP-461: FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing

2024-06-07 Thread Zakelly Lan
Hi Matthias,

Thanks for your reply!

That's something that could be considered as another optimization. I would
> consider this as a possible follow-up. My concern here is that we'd make
> the rescaling configuration even more complicated by introducing yet
> another parameter.


I'd be fine with considering this as a follow-up.

It might be worth renaming the internal interface into something that
> indicates its internal usage to avoid confusion.
>

Agree with this.

And another question:
I noticed the existing options under 'jobmanager.adaptive-scheduler' are
using the word 'scaling', e.g.
'jobmanager.adaptive-scheduler.scaling-interval.min'. While in this FLIP
you choose 'rescale'. Would you mind unifying them?


Best,
Zakelly


On Thu, Jun 6, 2024 at 10:57 PM David Morávek 
wrote:

> Thanks for the FLIP Matthias, I think it looks pretty solid!
>
> I also don't see a relation to unaligned checkpoints. From the AS
> perspective, the checkpoint time doesn't matter.
>
> Is it possible a change event observed right after a complete checkpoint
> > (or within a specific short time after a checkpoint) that triggers a
> > rescale immediately? Sometimes the checkpoint interval is huge and it is
> > better to rescale immediately.
> >
>
> I had considered this initially too, but it feels like a possible follow-up
> optimization.
>
> The primary objective of the proposed solution is to enhance overall
> predictability. With a longer checkpointing interval, the current situation
> worsens as we might have to reprocess a substantial backlog.
>
> I think in the future we might actually want to enhance this by triggering
> some kind of specialized "rescaling" checkpoint that prepares the cluster
> for rescaling (eg. by replicating state to new slots / pre-splitting the
> db, ...), to make things faster.
>
> Best,
> D.
>
> On Wed, Jun 5, 2024 at 4:34 PM Matthias Pohl  wrote:
>
> > Hi Zakelly,
> > thanks for your reply. See my inlined responses below:
> >
> > On Wed, Jun 5, 2024 at 10:26 AM Zakelly Lan 
> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for your proposal! I have a few questions:
> > >
> > > 1. Is it possible a change event observed right after a complete
> > checkpoint
> > > (or within a specific short time after a checkpoint) that triggers a
> > > rescale immediately? Sometimes the checkpoint interval is huge and it
> is
> > > better to rescale immediately.
> > >
> >
> > That's something that could be considered as another optimization. I
> would
> > consider this as a possible follow-up. My concern here is that we'd make
> > the rescaling configuration even more complicated by introducing yet
> > another parameter.
> >
> >
> > > 2. Should we introduce `CheckpointLifecycleListener` instead of reusing
> > > `CheckpointListener`? Is `CheckpointListener` enough for this scenario?
> > >
> >
> > Good point, they are serving similar purposes. But I'm hesitant to use
> > CheckpointListener (which is a public interface) for this internal quite
> > narrowly scoped runtime-specific use case of FLIP-461.
> >
> > It might be worth renaming the internal interface into something that
> > indicates its internal usage to avoid confusion.
> >
> >
> > > Best,
> > > Zakelly
> > >
> > > On Wed, Jun 5, 2024 at 3:02 PM Matthias Pohl 
> wrote:
> > >
> > > > Hi ConradJam,
> > > > thanks for your response.
> > > >
> > > > The CheckpointStatsTracker gets notified about the checkpoint
> > completion
> > > > after the checkpoint is finalized, i.e. all its data is persisted and
> > the
> > > > metadata is written to the CompletedCheckpointStore. At this moment,
> > the
> > > > checkpoint is considered for restoring a job and, therefore, becomes
> > > > available for restarts. This workflow also applies to unaligned
> > > > checkpoints. But I see how this context might be helpful for
> > > understanding
> > > > the change. I will add it to the FLIP. So far, I don't see a reason
> > > > to disable the feature for unaligned checkpoints. Do you see other
> > issues
> > > > that might make it necessary to disable this feature for this type of
> > > > checkpoints?
> > > >
> > > > Can you elaborate a bit more what you mean by "checkpoints that do
> not
> > > > check it"? I do not fully understand what you are referring to with
> > "it"
> > > > here.
> &

[jira] [Created] (FLINK-35537) Error parsing list of enum in legacy yaml configuration

2024-06-06 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35537:
---

 Summary: Error parsing list of enum in legacy yaml configuration 
 Key: FLINK-35537
 URL: https://issues.apache.org/jira/browse/FLINK-35537
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Zakelly Lan


In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
{code:java}
Caused by: java.lang.IllegalArgumentException: Could not parse value 
'[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
... 19 more
Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
DISABLE_COMPRESSION_OPTION]]
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
at 
org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
at 
org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
at java.util.Optional.map(Optional.java:215)
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
... 28 more
{code}
I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35535) Enable benchmark profiling in daily run

2024-06-05 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35535:
---

 Summary: Enable benchmark profiling in daily run
 Key: FLINK-35535
 URL: https://issues.apache.org/jira/browse/FLINK-35535
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.20.0


After FLINK-35534, the flink-benchmark supports profiling. We could consider 
enabling this in daily run.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35534) Support profiler for benchmarks

2024-06-05 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35534:
---

 Summary: Support profiler for benchmarks
 Key: FLINK-35534
 URL: https://issues.apache.org/jira/browse/FLINK-35534
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Zakelly Lan
 Fix For: 1.20.0


As JMH support profiling during benchmark, the flink-benchmark could leverage 
this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-05 Thread Zakelly Lan
Congratulations, Rui!

Best,
Zakelly

On Wed, Jun 5, 2024 at 6:02 PM Piotr Nowojski  wrote:

> Hi everyone,
>
> On behalf of the PMC, I'm very happy to announce another new Apache Flink
> PMC Member - Fan Rui.
>
> Rui has been active in the community since August 2019. During this time he
> has contributed a lot of new features. Among others:
>   - Decoupling Autoscaler from Kubernetes Operator, and supporting
> Standalone Autoscaler
>   - Improvements to checkpointing, flamegraphs, restart strategies,
> watermark alignment, network shuffles
>   - Optimizing the memory and CPU usage of large operators, greatly
> reducing the risk and probability of TaskManager OOM
>
> He reviewed a significant amount of PRs and has been active both on the
> mailing lists and in Jira helping to both maintain and grow Apache Flink's
> community. He is also our current Flink 1.20 release manager.
>
> In the last 12 months, Rui has been the most active contributor in the
> Flink Kubernetes Operator project, while being the 2nd most active Flink
> contributor at the same time.
>
> Please join me in welcoming and congratulating Fan Rui!
>
> Best,
> Piotrek (on behalf of the Flink PMC)
>


Re: [DISCUSS] FLIP-461: FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing

2024-06-05 Thread Zakelly Lan
Hi Matthias,

Thanks for your proposal! I have a few questions:

1. Is it possible a change event observed right after a complete checkpoint
(or within a specific short time after a checkpoint) that triggers a
rescale immediately? Sometimes the checkpoint interval is huge and it is
better to rescale immediately.
2. Should we introduce `CheckpointLifecycleListener` instead of reusing
`CheckpointListener`? Is `CheckpointListener` enough for this scenario?


Best,
Zakelly

On Wed, Jun 5, 2024 at 3:02 PM Matthias Pohl  wrote:

> Hi ConradJam,
> thanks for your response.
>
> The CheckpointStatsTracker gets notified about the checkpoint completion
> after the checkpoint is finalized, i.e. all its data is persisted and the
> metadata is written to the CompletedCheckpointStore. At this moment, the
> checkpoint is considered for restoring a job and, therefore, becomes
> available for restarts. This workflow also applies to unaligned
> checkpoints. But I see how this context might be helpful for understanding
> the change. I will add it to the FLIP. So far, I don't see a reason
> to disable the feature for unaligned checkpoints. Do you see other issues
> that might make it necessary to disable this feature for this type of
> checkpoints?
>
> Can you elaborate a bit more what you mean by "checkpoints that do not
> check it"? I do not fully understand what you are referring to with "it"
> here.
>
> Best,
> Matthias
>
> On Wed, Jun 5, 2024 at 4:46 AM ConradJam  wrote:
>
> > I have a few questions:
> > Unaligned checkpoints Do we need to enable this feature? Whether this
> > feature should be disabled for checkpoints that do not check it
> >
> > Matthias Pohl  于2024年6月4日周二 18:03写道:
> >
> > > Hi everyone,
> > > I'd like to discuss FLIP-461 [1]. The FLIP proposes the synchronization
> > of
> > > rescaling and the completion of checkpoints. The idea is to reduce the
> > > amount of data that needs to be processed after rescaling happened. A
> > more
> > > detailed motivation can be found in FLIP-461.
> > >
> > > I'm looking forward to feedback and suggestions.
> > >
> > > Best,
> > > Matthias
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing
> > >
> >
> >
> > --
> > Best
> >
> > ConradJam
> >
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo

2024-06-04 Thread Zakelly Lan
Congratulations, Weijie!

Best,
Zakelly

On Tue, Jun 4, 2024 at 7:49 PM Sergey Nuyanzin  wrote:

> Congratulations Weijio Guo!
>
> On Tue, Jun 4, 2024, 13:45 Jark Wu  wrote:
>
> > Congratulations, Weijie!
> >
> > Best,
> > Jark
> >
> > On Tue, 4 Jun 2024 at 19:10, spoon_lz  wrote:
> >
> > > Congratulations, Weijie!
> > >
> > >
> > >
> > > Regards,
> > > Zhuo.
> > >
> > >
> > >
> > >
> > >
> > >  Replied Message 
> > > | From | Aleksandr Pilipenko |
> > > | Date | 06/4/2024 18:59 |
> > > | To |  |
> > > | Subject | Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo |
> > > Congratulations, Weijie!
> > >
> > > Best,
> > > Aleksandr
> > >
> > > On Tue, 4 Jun 2024 at 11:42, Abdulquddus Babatunde Ekemode <
> > > abdulqud...@aligence.io> wrote:
> > >
> > > Congratulations! I wish you all the best.
> > >
> > > Best Regards,
> > > Abdulquddus
> > >
> > > On Tue, 4 Jun 2024 at 13:14, Ahmed Hamdy  wrote:
> > >
> > > Congratulations Weijie
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Tue, 4 Jun 2024 at 10:51, Matthias Pohl  wrote:
> > >
> > > Congratulations, Weijie!
> > >
> > > Matthias
> > >
> > > On Tue, Jun 4, 2024 at 11:12 AM Guowei Ma 
> > > wrote:
> > >
> > > Congratulations!
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Tue, Jun 4, 2024 at 4:55 PM gongzhongqiang <
> > > gongzhongqi...@apache.org
> > >
> > > wrote:
> > >
> > > Congratulations Weijie! Best,
> > > Zhongqiang Gong
> > >
> > > Xintong Song  于2024年6月4日周二 14:46写道:
> > >
> > > Hi everyone,
> > >
> > > On behalf of the PMC, I'm very happy to announce that Weijie Guo
> > > has
> > > joined
> > > the Flink PMC!
> > >
> > > Weijie has been an active member of the Apache Flink community
> > > for
> > > many
> > > years. He has made significant contributions in many components,
> > > including
> > > runtime, shuffle, sdk, connectors, etc. He has driven /
> > > participated
> > > in
> > > many FLIPs, authored and reviewed hundreds of PRs, been
> > > consistently
> > > active
> > > on mailing lists, and also helped with release management of 1.20
> > > and
> > > several other bugfix releases.
> > >
> > > Congratulations and welcome Weijie!
> > >
> > > Best,
> > >
> > > Xintong (on behalf of the Flink PMC)
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
>


Re: [VOTE] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Zakelly Lan
+1 (binding)

Best,
Zakelly

On Thu, May 23, 2024 at 8:21 PM Piotr Nowojski  wrote:

> Hi all,
>
> After reaching what looks like a consensus in the discussion thread [1], I
> would like to put FLIP-443 [2] to the vote.
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1] https://lists.apache.org/thread/flxm7rphvfgqdn2gq2z0bb7kl007olpz
> [2] https://cwiki.apache.org/confluence/x/qgn9EQ
>
> Bets,
> Piotrek
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Zakelly Lan
Hi Piotrek,

It looks good to me. Thanks for the update!


Best,
Zakelly

On Thu, May 23, 2024 at 7:04 PM Piotr Nowojski 
wrote:

> Hi Zakelly,
>
> I've thought about it a bit more, and I think only `#execute()` methods
> make the most sense to be used when implementing operators (and
> interruptible mails), so I will just add `MailOptions` parameters only to
> them. If necessary, we can add more in the future.
>
> I have updated the FLIP. If it looks good to you, I would start a voting
> thread today/tomorrow.
>
> Best,
> Piotrek
>
> czw., 23 maj 2024 o 09:00 Zakelly Lan  napisał(a):
>
> > Hi Piotrek,
> >
> > Well, compared to this plan, I prefer your previous one, which is more in
> > line with the intuition for executors' API, by calling `execute`
> directly.
> > Before the variants get too much, I'd suggest we only do minimum change
> for
> > only "interruptible".
> >
> > My original thinking is, doubling each method could result in a scenario
> > where new methods lack callers. But like you said, for the sake of
> > completeness, I could accept the doubling method plan.
> >
> >
> > Thanks & Best,
> > Zakelly
> >
> > On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > > I suggest not doubling the existing methods. Only providing the
> > following
> > > one is enough
> > >
> > > In that case I would prefer to have a complete set of the methods for
> the
> > > sake of completeness. If the number of variants is/would be getting too
> > > much, we could convert the class into a builder?
> > >
> > >
> > >
> >
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> > > %d").arg(42).submit();
> > >
> > > It could be done in both in the future, if we would ever need to add
> even
> > > more methods, or I could do it now. WDYT?
> > >
> > > Best,
> > > Piotrek
> > >
> > > śr., 22 maj 2024 o 08:48 Zakelly Lan 
> napisał(a):
> > >
> > > > Hi Piotrek,
> > > >
> > > > `MailOptions` looks good to me. I suggest not doubling the existing
> > > > methods. Only providing the following one is enough:
> > > >
> > > > void execute(
> > > > > MailOptions mailOptions,
> > > > > ThrowingRunnable command,
> > > > > String descriptionFormat,
> > > > > Object... descriptionArgs);
> > > >
> > > >
> > > > WDYT?
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > >
> > > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Zakelly and others,
> > > > >
> > > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> > the
> > > > > > continuation mail will return true. The FLIP-425 will leverage
> this
> > > > queue
> > > > > > to execute some state requests, and when the cp arrives, the
> > operator
> > > > may
> > > > > > call `yield()` to drain. It may happen that the continuation mail
> > is
> > > > > called
> > > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> > this
> > > > > mail
> > > > > > and re-enqueue.
> > > > >
> > > > > Do you have some suggestions on how `isInterruptible` should be
> > > defined?
> > > > > Do we have to double the amount of methods in the
> `MailboxExecutor`,
> > to
> > > > > provide versions of the existing methods, that would enqueue
> > > > > "interruptible"
> > > > > versions of mails? Something like:
> > > > >
> > > > > default void execute(ThrowingRunnable
> > command,
> > > > > String description) {
> > > > > execute(DEFAULT_OPTIONS, command, description);
> > > > > }
> > > > >
> > > > > default void execute(MailOptions options, ThrowingRunnable > > extends
> > > > > Exception> command, String description) {
> > > > > execute(options, command, description, EMPTY_ARGS);
> > > > > }
> > > > >
> 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Zakelly Lan
Hi Piotrek,

Well, compared to this plan, I prefer your previous one, which is more in
line with the intuition for executors' API, by calling `execute` directly.
Before the variants get too much, I'd suggest we only do minimum change for
only "interruptible".

My original thinking is, doubling each method could result in a scenario
where new methods lack callers. But like you said, for the sake of
completeness, I could accept the doubling method plan.


Thanks & Best,
Zakelly

On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> > I suggest not doubling the existing methods. Only providing the following
> one is enough
>
> In that case I would prefer to have a complete set of the methods for the
> sake of completeness. If the number of variants is/would be getting too
> much, we could convert the class into a builder?
>
>
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> %d").arg(42).submit();
>
> It could be done in both in the future, if we would ever need to add even
> more methods, or I could do it now. WDYT?
>
> Best,
> Piotrek
>
> śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):
>
> > Hi Piotrek,
> >
> > `MailOptions` looks good to me. I suggest not doubling the existing
> > methods. Only providing the following one is enough:
> >
> > void execute(
> > > MailOptions mailOptions,
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs);
> >
> >
> > WDYT?
> >
> >
> > Best,
> > Zakelly
> >
> >
> > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly and others,
> > >
> > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > > continuation mail will return true. The FLIP-425 will leverage this
> > queue
> > > > to execute some state requests, and when the cp arrives, the operator
> > may
> > > > call `yield()` to drain. It may happen that the continuation mail is
> > > called
> > > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > > mail
> > > > and re-enqueue.
> > >
> > > Do you have some suggestions on how `isInterruptible` should be
> defined?
> > > Do we have to double the amount of methods in the `MailboxExecutor`, to
> > > provide versions of the existing methods, that would enqueue
> > > "interruptible"
> > > versions of mails? Something like:
> > >
> > > default void execute(ThrowingRunnable command,
> > > String description) {
> > > execute(DEFAULT_OPTIONS, command, description);
> > > }
> > >
> > > default void execute(MailOptions options, ThrowingRunnable extends
> > > Exception> command, String description) {
> > > execute(options, command, description, EMPTY_ARGS);
> > > }
> > >
> > > default void execute(
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs) {
> > > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > descriptionArgs);
> > > }
> > >
> > >void execute(
> > > MailOptions options,
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs);
> > >
> > >public static class MailOptions {
> > > (...)
> > > public MailOptions() {
> > > }
> > >
> > > MailOptions setIsInterruptible() {
> > > this.isInterruptible = true;
> > > return this;
> > > }
> > > }
> > >
> > > And usage would be like this:
> > >
> > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> > > foo(); }, "foo");
> > >
> > > ?
> > >
> > > Best,
> > > Piotrek
> > >
> > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > > we checked in the firing timers benchmark [1] and we didn't observe
> > any
> > > > > performance regression.
> > > >
> > > > Thanks for the feedback, it's good news to hear that. I didn't no

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-22 Thread Zakelly Lan
Hi Piotrek,

`MailOptions` looks good to me. I suggest not doubling the existing
methods. Only providing the following one is enough:

void execute(
> MailOptions mailOptions,
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs);


WDYT?


Best,
Zakelly


On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
wrote:

> Hi Zakelly and others,
>
> > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > continuation mail will return true. The FLIP-425 will leverage this queue
> > to execute some state requests, and when the cp arrives, the operator may
> > call `yield()` to drain. It may happen that the continuation mail is
> called
> > again in `yield()`. By checking `isInterruptable()`, we can skip this
> mail
> > and re-enqueue.
>
> Do you have some suggestions on how `isInterruptible` should be defined?
> Do we have to double the amount of methods in the `MailboxExecutor`, to
> provide versions of the existing methods, that would enqueue
> "interruptible"
> versions of mails? Something like:
>
> default void execute(ThrowingRunnable command,
> String description) {
> execute(DEFAULT_OPTIONS, command, description);
> }
>
> default void execute(MailOptions options, ThrowingRunnable Exception> command, String description) {
> execute(options, command, description, EMPTY_ARGS);
> }
>
> default void execute(
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs) {
> execute(DEFAULT_OPTIONS, command, descriptionFormat,
> descriptionArgs);
> }
>
>void execute(
> MailOptions options,
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs);
>
>public static class MailOptions {
> (...)
> public MailOptions() {
> }
>
> MailOptions setIsInterruptible() {
> this.isInterruptible = true;
> return this;
> }
> }
>
> And usage would be like this:
>
> mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> foo(); }, "foo");
>
> ?
>
> Best,
> Piotrek
>
> czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > > we checked in the firing timers benchmark [1] and we didn't observe any
> > > performance regression.
> >
> > Thanks for the feedback, it's good news to hear that. I didn't notice
> > we already have fireProcessingTimers benchmark.
> >
> > If so, we can follow it after this FLIP is merged.
> >
> > +1 for this FLIP.
> >
> > Best,
> > Rui
> >
> > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > > I'm suggesting skipping the continuation mail during draining of
> async
> > > state access.
> > >
> > > I see. That makes sense to me now. I will later update the FLIP.
> > >
> > > > the code path will become more complex after this FLIP
> > > due to the addition of shouldIntterupt() checks, right?
> > >
> > > Yes, that's correct.
> > >
> > > > If so, it's better to add a benchmark to check whether the job
> > > > performance regresses when one job has a lot of timers.
> > > > If the performance regresses too much, we need to re-consider it.
> > > > Of course, I hope the performance is fine.
> > >
> > > I had the same concerns when initially David Moravek proposed this
> > > solution,
> > > but we checked in the firing timers benchmark [1] and we didn't observe
> > any
> > > performance regression.
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
> > >
> > >
> > >
> > > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Overall this FLIP is fine for me. I have a minor concern:
> > > > IIUC, the code path will become more complex after this FLIP
> > > > due to the addition of shouldIntterupt() checks, right?
> > > >
> > > > If so, it's better to add a benchmark to check whether the job
> > > > performance regresses when one job has a lot of timers.
> > > > If the performance regresses too much, we need to re-consider it

Re: [DISCUSS] The performance of serializerHeavyString starts regress since April 3

2024-05-21 Thread Zakelly Lan
Hi Rui and RMs of Flink 1.20,

Thanks for driving this!

Available information indicates this issue is environment- and
JDK-specific, and I also failed to reproduce it in my Mac. Thus I guess it
is caused by JIT behavior, which is unpredictable and vulnerable to
disturbance of the codebase. Considering the historical context of this
test provided by Piotr, I vote a "Won't fix" for this problem.

And I can offer some help if anyone wants to investigate the benchmark
environment, please reach out to me. JDK version info:

> openjdk version "11.0.19" 2023-04-18 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.19.0.7-2) (build 11.0.19+7-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.19.0.7-2) (build 11.0.19+7-LTS,
> mixed mode, sharing)

The OS version is Alibaba Cloud Linux 3.2104 LTS 64-bit[1]. The linux
kernel version is 5.10.134-15.al8.x86_64.


Best,
Zakelly

[1]
https://www.alibabacloud.com/help/en/alinux/product-overview/release-notes-for-alibaba-cloud-linux
(See: Alibaba Cloud Linux 3.2104 U8, image id:
aliyun_3_x64_20G_alibase_20230727.vhd)

On Tue, May 21, 2024 at 8:15 PM Piotr Nowojski  wrote:

> Hi,
>
> Given what you wrote, that you have investigated the issue and couldn't
> find any easy explanation, I would suggest closing this ticket as "Won't
> do" or "Can not reproduce" and ignoring the problem.
>
> In the past there have been quite a bit of cases where some benchmark
> detected a performance regression. Sometimes those can not be reproduced,
> other times (as it's the case here), some seemingly unrelated change is
> causing the regression. The same thing happened in this benchmark many
> times in the past [1], [2], [3], [4]. Generally speaking this benchmark has
> been in the spotlight a couple of times [5].
>
> Note that there have been cases where this benchmark did detect a
> performance regression :)
>
> My personal suspicion is that after that commons-io version bump,
> something poked JVM/JIT to compile the code a bit differently for string
> serialization causing this regression. We have a couple of benchmarks that
> seem to be prone to such semi intermittent issues. For example the same
> benchmark was subject to this annoying pattern [6], that I've spotted in
> quite a bit of benchmarks over the years [6]:
>
> [image: image.png]
> (https://imgur.com/a/AoygmWS)
>
> Where benchmark results are very stable within a single JVM fork. But
> between two forks, they can reach two different "stable" levels. Here it
> looks like 50% of the chance of getting stable "200 records/ms" and 50%
> chances of "250 records/ms".
>
> A small interlude. Each of our benchmarks run in 3 different JVM forks, 10
> warm up iterations and 10 measurement iterations. Each iteration
> lasts/invokes the benchmarking method at least for one second. So by "very
> stable" results, I mean that for example after the 2nd or 3rd warm up
> iteration, the results stabilize < +/-1%, and stay on that level for the
> whole duration of the fork.
>
> Given that we are repeating the same benchmark in 3 different forks, we
> can have by pure chance:
> - 3 slow fork - total average 200 records/ms
> - 2 slow fork, 1 fast fork - average 216 r/ms
> - 1 slow fork, 2 fast forks - average 233 r/ms
> - 3 fast forks - average 250 r/ms
>
> So this benchmark is susceptible to enter some different semi stable
> states. As I wrote above, I guess something with the commons-io version
> bump just swayed it to a different semi stable state :( I have never gotten
> desperate enough to actually dig further what's exactly causing this kind
> of issues.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18684
> [2] https://issues.apache.org/jira/browse/FLINK-27133
> [3] https://issues.apache.org/jira/browse/FLINK-27165
> [4] https://issues.apache.org/jira/browse/FLINK-31745
> [5]
> https://issues.apache.org/jira/browse/FLINK-35040?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20Resolved%2C%20Closed)%20AND%20text%20~%20%22serializerHeavyString%22
> [6]
> http://flink-speed.xyz/timeline/#/?exe=1=serializerHeavyString=on=on=off=2=1000
>
> wt., 21 maj 2024 o 12:50 Rui Fan <1996fan...@gmail.com> napisał(a):
>
>> Hi devs:
>>
>> We(release managers of flink 1.20) wanna update one performance
>> regresses to the flink dev mail list.
>>
>> # Background:
>>
>> The performance of serializerHeavyString starts regress since April 3,
>> and we created FLINK-35040[1] to follow it.
>>
>> In brief:
>> - The performance only regresses for jdk 11, and Java 8 and Java 17 are
>> fine.
>> - The regression reason is upgrading commons-io version from 2.11.0 to
>> 2.15.1
>>   - This upgrading is done in FLINK-34955[2].
>>   - The performance can be recovered after reverting the commons-io
>> version
>> to 2.11.0
>>
>> You can get more details from FLINK-35040[1].
>>
>> # Problem
>>
>> We try to generate the flame graph (wall mode) to analyze why upgrading
>> the commons-io version affects the performance. 

[jira] [Created] (FLINK-35412) Batch execution of async state request callback

2024-05-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35412:
---

 Summary: Batch execution of async state request callback
 Key: FLINK-35412
 URL: https://issues.apache.org/jira/browse/FLINK-35412
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Zakelly Lan


There is one mail for each callback when async state result returns. One 
possible optimization is to encapsulate multiple callbacks into one mail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35411) Optimize wait logic in draining of async state requests

2024-05-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35411:
---

 Summary: Optimize wait logic in draining of async state requests
 Key: FLINK-35411
 URL: https://issues.apache.org/jira/browse/FLINK-35411
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Zakelly Lan


Currently during draining of async state requests, the task thread performs 
{{Thread.sleep}} to avoid cpu overhead when polling mails. This can be 
optimized by wait & notify.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35410) Avoid sync waiting in coordinator thread of ForSt executor

2024-05-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35410:
---

 Summary: Avoid sync waiting in coordinator thread of ForSt executor
 Key: FLINK-35410
 URL: https://issues.apache.org/jira/browse/FLINK-35410
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 2.0.0


Currently, the coordinator thread of ForSt executor will sync wait the state 
access result, which can be optimized.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35400) Rebuild FileMergingSnapshotManager in failover

2024-05-20 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35400:
---

 Summary: Rebuild FileMergingSnapshotManager in failover
 Key: FLINK-35400
 URL: https://issues.apache.org/jira/browse/FLINK-35400
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan


Currently, the {{FileMergingSnapshotManager}} is released within 
{{{}releaseJobResources{}}}, which will not be invoked during failover and 
restore. The manager should be created again to clear all internal states in a 
new job attempt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35379) File merging manager is not properly notified about checkpoint

2024-05-16 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35379:
---

 Summary: File merging manager is not properly notified about 
checkpoint
 Key: FLINK-35379
 URL: https://issues.apache.org/jira/browse/FLINK-35379
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Zakelly Lan


Currently, the \{{FileMergingSnapshotManager}} from checkpoint file merging 
mechanism is not properly notified about checkpoint, and it does not handle the 
notifications properly as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35356) Async reducing state

2024-05-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35356:
---

 Summary: Async reducing state
 Key: FLINK-35356
 URL: https://issues.apache.org/jira/browse/FLINK-35356
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35355) Async aggregating state

2024-05-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35355:
---

 Summary: Async aggregating state
 Key: FLINK-35355
 URL: https://issues.apache.org/jira/browse/FLINK-35355
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35307) Add Compile CI check on jdk17

2024-05-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35307:
---

 Summary: Add Compile CI check on jdk17
 Key: FLINK-35307
 URL: https://issues.apache.org/jira/browse/FLINK-35307
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-06 Thread Zakelly Lan
to a remote location
> - `canFastCopy(remote, local)` returns true - FS can natively download
> local file from a remote location
>
> Maybe indeed that's a better solution vs having two separate interfaces for
> copying and duplicating?
>
> > 3. Will the interface extracting introduce a break change?
>
> No. The signature of the existing abstract `FileSystem` class would remain
> the same. Only most/all of the abstract methods would be
> pulled out to the interface and abstract `FileSystem` would implement that
> new interface.
>
> Best,
> Piotrek
>
> pon., 6 maj 2024 o 04:55 Zakelly Lan  napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for the proposal. It's meaningful to speed up the state download.
> I
> > get into some questions:
> >
> > 1. What is the semantic of `canCopyPath`? Should it be associated with a
> > specific destination path? e.g. It can be copied to local, but not to the
> > remote FS.
> > 2. Is the existing interface `DuplicatingFileSystem` feasible/enough for
> > this case?
> > 3. Will the interface extracting introduce a break change?
> >
> >
> > Best,
> > Zakelly
> >
> >
> > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko 
> > wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for the proposal.
> > > How adding a s5cmd will affect memory footprint? Since this is a native
> > > binary, memory consumption will not be controlled by JVM or Flink.
> > >
> > > Thanks,
> > > Aleksandr
> > >
> > > On Thu, 2 May 2024 at 11:12, Hong Liang  wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the FLIP! Nice to see work to improve the filesystem
> > > > performance. +1 to future work to improve the upload speed as well.
> > This
> > > > would be useful for jobs with large state and high Async
> checkpointing
> > > > times.
> > > >
> > > > Some thoughts on the configuration, it might be good for us to
> > introduce
> > > 2x
> > > > points of configurability for future proofing:
> > > > 1/ Configure the implementation of PathsCopyingFileSystem used, maybe
> > by
> > > > config, or by ServiceResources (this would allow us to use this for
> > > > alternative clouds/Implement S3 SDKv2 support if we want this in the
> > > > future). Also this could be used as a feature flag to determine if we
> > > > should be using this new native file copy support.
> > > > 2/ Configure the location of the s5cmd binary (version control etc.),
> > as
> > > > you have mentioned in the FLIP.
> > > >
> > > > Regards,
> > > > Hong
> > > >
> > > >
> > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> > > >  wrote:
> > > >
> > > > > Hey Piotr,
> > > > >
> > > > > Thanks for the proposal! It would be great improvement!
> > > > >
> > > > > Some questions from my side:
> > > > >
> > > > > > In order to configure s5cmd Flink’s user would need
> > > > > > to specify path to the s5cmd binary.
> > > > >
> > > > > Could you please also add the configuration property
> > > > > for this? An example showing how users would set this
> > > > > parameter would be helpful.
> > > > >
> > > > > Would this affect any filesystem connectors that use
> > > > > FileSystem[1][2] dependencies?
> > > > >
> > > > > Best,
> > > > > Muhammet
> > > > >
> > > > > [1]:
> > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > > [2]:
> > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > > >
> > > > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > > > Hi all!
> > > > > >
> > > > > > I would like to put under discussion:
> > > > > >
> > > > > > FLIP-444: Native file copy support
> > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > > >
> > > > > > This proposal aims to speed up Flink recovery times, by speeding
> up
> > > > > > state
> > > > > > download times. However in the future, the same mechanism could
> be
> > > also
> > > > > > used to speed up state uploading (checkpointing/savepointing).
> > > > > >
> > > > > > I'm curious to hear your thoughts.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-06 Thread Zakelly Lan
Hi Piotr,

I'm saying the scenario where things happen in the following order:
1. advance watermark and process timers.
2. the cp arrives and interrupts the timer processing, after this the
continuation mail is in the mailbox queue.
3. `snapshotState` is called, where the async state access responses will
be drained by calling `tryYield()` [1]. —— What if the continuation mail is
triggered by `tryYield()`?

I'm suggesting skipping the continuation mail during draining of async
state access.


[1]
https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305

Best,
Zakelly


On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> Can you elaborate a bit more on what you have in mind? How marking mails as
> interruptible helps with something? If an incoming async state access
> response comes, it could just request to interrupt any currently ongoing
> computations, regardless the currently executed mail is or is not
> interruptible.
>
> Best,
> Piotrek
>
> pon., 6 maj 2024 o 06:33 Zakelly Lan  napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for the improvement, overall +1 for this. I'd leave a minor
> comment:
> >
> > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > continuation mail will return true. The FLIP-425 will leverage this queue
> > to execute some state requests, and when the cp arrives, the operator may
> > call `yield()` to drain. It may happen that the continuation mail is
> called
> > again in `yield()`. By checking `isInterruptable()`, we can skip this
> mail
> > and re-enqueue.
> >
> >
> > Best,
> > Zakelly
> >
> > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:
> >
> > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> improvement.
> > >
> > > Best,
> > > Yanfei
> > >
> > > Stefan Richter  于2024年4月30日周二 21:30写道:
> > > >
> > > >
> > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > >
> > > > Best,
> > > > Stefan
> > > >
> > > >
> > > >
> > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan 
> > wrote:
> > > > >
> > > > > Thanks for the proposal, I definitely see the need for this
> > > improvement, +1.
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> pnowoj...@apache.org
> > > <mailto:pnowoj...@apache.org>> wrote:
> > > > >
> > > > >> Hi Yanfei,
> > > > >>
> > > > >> Thanks for the feedback!
> > > > >>
> > > > >>> 1. Currently when AbstractStreamOperator or
> > AbstractStreamOperatorV2
> > > > >>> processes a watermark, the watermark will be sent to downstream,
> if
> > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> > when
> > > > >>> is the watermark sent downstream?
> > > > >>
> > > > >> The watermark would be outputted by an operator only once all
> > relevant
> > > > >> timers are fired.
> > > > >> In other words, if firing of timers is interrupted a continuation
> > > mail to
> > > > >> continue firing those
> > > > >> interrupted timers is created. Watermark will be emitted
> downstream
> > > at the
> > > > >> end of that
> > > > >> continuation mail.
> > > > >>
> > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> > and
> > > > >>> executed in mailbox. Is processing-timer allowed to be
> interrupted?
> > > > >>
> > > > >> Yes, both firing processing and even time timers share the same
> code
> > > and
> > > > >> both will
> > > > >> support interruptions in the same way. Actually I've renamed the
> > FLIP
> > > from
> > > > >>
> > > > >>> Interruptible watermarks processing
> > > > >>
> > > > >> to:
> > > > >>
> > > > >>> Interruptible timers firing
> > > > >>
> > > > >> to make this more clear.
> > 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-05 Thread Zakelly Lan
Hi Piotr,

Thanks for the improvement, overall +1 for this. I'd leave a minor comment:

1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
continuation mail will return true. The FLIP-425 will leverage this queue
to execute some state requests, and when the cp arrives, the operator may
call `yield()` to drain. It may happen that the continuation mail is called
again in `yield()`. By checking `isInterruptable()`, we can skip this mail
and re-enqueue.


Best,
Zakelly

On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:

> Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.
>
> Best,
> Yanfei
>
> Stefan Richter  于2024年4月30日周二 21:30写道:
> >
> >
> > Thanks for the improvement proposal, I’m +1 for the change!
> >
> > Best,
> > Stefan
> >
> >
> >
> > > On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> > >
> > > Thanks for the proposal, I definitely see the need for this
> improvement, +1.
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > wrote:
> > >
> > >> Hi Yanfei,
> > >>
> > >> Thanks for the feedback!
> > >>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>
> > >> The watermark would be outputted by an operator only once all relevant
> > >> timers are fired.
> > >> In other words, if firing of timers is interrupted a continuation
> mail to
> > >> continue firing those
> > >> interrupted timers is created. Watermark will be emitted downstream
> at the
> > >> end of that
> > >> continuation mail.
> > >>
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>
> > >> Yes, both firing processing and even time timers share the same code
> and
> > >> both will
> > >> support interruptions in the same way. Actually I've renamed the FLIP
> from
> > >>
> > >>> Interruptible watermarks processing
> > >>
> > >> to:
> > >>
> > >>> Interruptible timers firing
> > >>
> > >> to make this more clear.
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
> > >>
> > >>> Hi Piotrek,
> > >>>
> > >>> Thanks for this proposal. It looks like it will shorten the
> checkpoint
> > >>> duration, especially in the case of back pressure. +1 for it!  I'd
> > >>> like to ask some questions to understand your thoughts more
> precisely.
> > >>>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>>
> > >>> Best regards,
> > >>> Yanfei
> > >>>
> > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > >>>
> > 
> >  Hi all,
> > 
> >  I would like to start a discussion on FLIP-443: Interruptible
> watermark
> >  processing.
> > 
> > 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > 
> >  This proposal tries to make Flink's subtask thread more responsive
> when
> >  processing watermarks/firing timers, and make those operations
> >  interruptible/break them apart into smaller steps. At the same time,
> > >> the
> >  proposed solution could be potentially adopted in other places in
> the
> > >>> code
> >  base as well, to solve similar problems with other flatMap-like
> > >> operators
> >  (non windowed joins, aggregations, CepOperator, ...).
> > 
> >  I'm looking forward to your thoughts.
> > 
> >  Best,
> >  Piotrek
> >
>


Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread Zakelly Lan
+1 (binding)

Thanks for driving this!


Best,
Zakelly

On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:

> Hi everyone,
>
> Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
> Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is here
> [2].
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
>
> --
> Best,
> Yue
>


Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-05 Thread Zakelly Lan
Hi Piotr,

Thanks for the proposal. It's meaningful to speed up the state download. I
get into some questions:

1. What is the semantic of `canCopyPath`? Should it be associated with a
specific destination path? e.g. It can be copied to local, but not to the
remote FS.
2. Is the existing interface `DuplicatingFileSystem` feasible/enough for
this case?
3. Will the interface extracting introduce a break change?


Best,
Zakelly


On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko  wrote:

> Hi Piotr,
>
> Thanks for the proposal.
> How adding a s5cmd will affect memory footprint? Since this is a native
> binary, memory consumption will not be controlled by JVM or Flink.
>
> Thanks,
> Aleksandr
>
> On Thu, 2 May 2024 at 11:12, Hong Liang  wrote:
>
> > Hi Piotr,
> >
> > Thanks for the FLIP! Nice to see work to improve the filesystem
> > performance. +1 to future work to improve the upload speed as well. This
> > would be useful for jobs with large state and high Async checkpointing
> > times.
> >
> > Some thoughts on the configuration, it might be good for us to introduce
> 2x
> > points of configurability for future proofing:
> > 1/ Configure the implementation of PathsCopyingFileSystem used, maybe by
> > config, or by ServiceResources (this would allow us to use this for
> > alternative clouds/Implement S3 SDKv2 support if we want this in the
> > future). Also this could be used as a feature flag to determine if we
> > should be using this new native file copy support.
> > 2/ Configure the location of the s5cmd binary (version control etc.), as
> > you have mentioned in the FLIP.
> >
> > Regards,
> > Hong
> >
> >
> > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> >  wrote:
> >
> > > Hey Piotr,
> > >
> > > Thanks for the proposal! It would be great improvement!
> > >
> > > Some questions from my side:
> > >
> > > > In order to configure s5cmd Flink’s user would need
> > > > to specify path to the s5cmd binary.
> > >
> > > Could you please also add the configuration property
> > > for this? An example showing how users would set this
> > > parameter would be helpful.
> > >
> > > Would this affect any filesystem connectors that use
> > > FileSystem[1][2] dependencies?
> > >
> > > Best,
> > > Muhammet
> > >
> > > [1]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > [2]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > >
> > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > Hi all!
> > > >
> > > > I would like to put under discussion:
> > > >
> > > > FLIP-444: Native file copy support
> > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > >
> > > > This proposal aims to speed up Flink recovery times, by speeding up
> > > > state
> > > > download times. However in the future, the same mechanism could be
> also
> > > > used to speed up state uploading (checkpointing/savepointing).
> > > >
> > > > I'm curious to hear your thoughts.
> > > >
> > > > Best,
> > > > Piotrek
> > >
> >
>


Re: [DISCUSS] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-04-23 Thread Zakelly Lan
Hi Yue,

Thanks for this proposal!

Given the great improvement we could have, the slight regression in write
performance is a worthwhile trade-off, particularly as the mem-table
operations contribute only a minor part to the overall overhead. So +1 for
this.


Best,
Zakelly

On Tue, Apr 23, 2024 at 12:53 PM Yun Tang  wrote:

> Hi Yue,
>
> Thanks for driving this work.
>
> It has been three years since last major upgrade of FRocksDB. And it would
> be great improvement of Flink's state-backend with this upgrade. +1 for
> this work.
>
>
> Best
> Yun Tang
> 
> From: Yanfei Lei 
> Sent: Tuesday, April 23, 2024 12:50
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0
>
> Hi Yue & Roman,
>
> Thanks for initiating this FLIP and all the efforts for the upgrade.
>
> 8.10.0 introduces some new features, making it possible for Flink to
> implement some new exciting features, and the upgrade also makes
> FRocksDB easier to maintain, +1 for upgrading.
>
> I read the FLIP and have a minor comment, it would be better to add
> some description about the environment/configuration of the nexmark's
> result.
>
> Roman Khachatryan  于2024年4月23日周二 12:07写道:
>
> >
> > Hi,
> >
> > Thanks for writing the proposal and preparing the upgrade.
> >
> > FRocksDB  definitely needs to be kept in sync with the upstream and the
> new
> > APIs are necessary for faster rescaling.
> > We're already using a similar version internally.
> >
> > I reviewed the FLIP and it looks good to me (disclaimer: I took part in
> > some steps of this effort).
> >
> >
> > Regards,
> > Roman
> >
> > On Mon, Apr 22, 2024, 08:11 yue ma  wrote:
> >
> > > Hi Flink devs,
> > >
> > > I would like to start a discussion on FLIP-447: Upgrade FRocksDB from
> > > 6.20.3 to 8.10.0
> > >
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> > >
> > > This FLIP proposes upgrading the version of FRocksDB in the Flink
> Project
> > > from 6.20.3 to 8.10.0.
> > > The FLIP mainly introduces the main benefits of upgrading FRocksDB,
> > > including the use of IngestDB which can improve Rescaling performance
> by
> > > more than 10 times in certain scenarios, as well as other potential
> > > optimization points such as async_io, blob db, and tiered storage.The
> > > FLIP also presented test results based on RocksDB 8.10, including
> > > StateBenchmark and Nexmark tests.
> > > Overall, upgrading FRocksDB may result in a small regression of write
> > > performance( which is a very small part of the overall overhead), but
> it
> > > can bring many important performance benefits.
> > > So we hope to upgrade the version of FRocksDB through this FLIP.
> > >
> > > Looking forward to everyone's feedback and suggestions. Thank you!
> > > --
> > > Best regards,
> > > Yue
> > >
>
>
>
> --
> Best,
> Yanfei
>


[jira] [Created] (FLINK-35186) Create State V2 from new StateDescriptor

2024-04-22 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35186:
---

 Summary: Create State V2 from new StateDescriptor
 Key: FLINK-35186
 URL: https://issues.apache.org/jira/browse/FLINK-35186
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35168) Basic State Iterator for async processing

2024-04-18 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35168:
---

 Summary: Basic State Iterator for async processing
 Key: FLINK-35168
 URL: https://issues.apache.org/jira/browse/FLINK-35168
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35156) Wire new operators for async state with DataStream V2

2024-04-18 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35156:
---

 Summary: Wire new operators for async state with DataStream V2
 Key: FLINK-35156
 URL: https://issues.apache.org/jira/browse/FLINK-35156
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35153) Internal Async State Implementation and StateDescriptor for Map/List State

2024-04-18 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35153:
---

 Summary: Internal Async State Implementation and StateDescriptor 
for Map/List State
 Key: FLINK-35153
 URL: https://issues.apache.org/jira/browse/FLINK-35153
 Project: Flink
  Issue Type: Bug
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-17 Thread Zakelly Lan
+1 binding


Best,
Zakelly

On Wed, Apr 17, 2024 at 2:05 PM Rui Fan <1996fan...@gmail.com> wrote:

> +1(binding)
>
> Best,
> Rui
>
> On Wed, Apr 17, 2024 at 1:02 PM Xuannan Su  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback about the FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1] [2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours(excluding weekends,until APR 22, 12:00AM GMT) unless there is an
> > objection or an insufficient number of votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
> > [2] https://lists.apache.org/thread/15k0stwyoknhxvd643ctwjw3fd17pqwk
> >
> >
> > Best regards,
> > Xuannan
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan

2024-04-16 Thread Zakelly Lan
Thanks everyone! I'm glad to be part of such an active and collaborative
community!


Best,
Zakelly

On Wed, Apr 17, 2024 at 10:14 AM Zhanghao Chen 
wrote:

> Congrats, Zakelly!
>
> Best,
> Zhanghao Chen
> 
> From: Shawn Huang 
> Sent: Wednesday, April 17, 2024 9:59
> To: dev@flink.apache.org 
> Subject: Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan
>
> Congratulations, Zakelly!
>
> Best,
> Shawn Huang
>
>
> Feng Jin  于2024年4月17日周三 00:16写道:
>
> > Congratulations!
> >
> > Best,
> > Feng
> >
> > On Tue, Apr 16, 2024 at 10:43 PM Ferenc Csaky  >
> > wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Ferenc
> > >
> > >
> > >
> > >
> > > On Tuesday, April 16th, 2024 at 16:28, Jeyhun Karimov <
> > > je.kari...@gmail.com> wrote:
> > >
> > > >
> > > >
> > > > Congratulations Zakelly!
> > > >
> > > > Regards,
> > > > Jeyhun
> > > >
> > > > On Tue, Apr 16, 2024 at 6:35 AM Feifan Wang zoltar9...@163.com
> wrote:
> > > >
> > > > > Congratulations, Zakelly!——
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Feifan Wang
> > > > >
> > > > > At 2024-04-15 10:50:06, "Yuan Mei" yuanmei.w...@gmail.com wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > On behalf of the PMC, I'm happy to let you know that Zakelly Lan
> > has
> > > > > > become
> > > > > > a new Flink Committer!
> > > > > >
> > > > > > Zakelly has been continuously contributing to the Flink project
> > since
> > > > > > 2020,
> > > > > > with a focus area on Checkpointing, State as well as frocksdb
> (the
> > > default
> > > > > > on-disk state db).
> > > > > >
> > > > > > He leads several FLIPs to improve checkpoints and state APIs,
> > > including
> > > > > > File Merging for Checkpoints and configuration/API
> reorganizations.
> > > He is
> > > > > > also one of the main contributors to the recent efforts of
> > > "disaggregated
> > > > > > state management for Flink 2.0" and drives the entire discussion
> in
> > > the
> > > > > > mailing thread, demonstrating outstanding technical depth and
> > > breadth of
> > > > > > knowledge.
> > > > > >
> > > > > > Beyond his technical contributions, Zakelly is passionate about
> > > helping
> > > > > > the
> > > > > > community in numerous ways. He spent quite some time setting up
> the
> > > Flink
> > > > > > Speed Center and rebuilding the benchmark pipeline after the
> > > original one
> > > > > > was out of lease. He helps build frocksdb and tests for the
> > upcoming
> > > > > > frocksdb release (bump rocksdb from 6.20.3->8.10).
> > > > > >
> > > > > > Please join me in congratulating Zakelly for becoming an Apache
> > Flink
> > > > > > committer!
> > > > > >
> > > > > > Best,
> > > > > > Yuan (on behalf of the Flink PMC)
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Jing Ge

2024-04-14 Thread Zakelly Lan
Congratulations, Jing!


Best,
Zakelly

On Sat, Apr 13, 2024 at 12:47 AM Ferenc Csaky 
wrote:

> Congratulations, Jing!
>
> Best,
> Ferenc
>
>
>
> On Friday, April 12th, 2024 at 13:54, Ron liu  wrote:
>
> >
> >
> > Congratulations, Jing!
> >
> > Best,
> > Ron
> >
> > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> >
> > > Congratulations, Jing!
> > >
> > > Best,
> > > Junrui
> > >
> > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:28写道:
> > >
> > > > Congratulations, Jing!
> > > >
> > > > Best Regards,
> > > > Aleksandr
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee

2024-04-14 Thread Zakelly Lan
Congratulations, Lincoln!


Best,
Zakelly

On Sat, Apr 13, 2024 at 12:48 AM Ferenc Csaky 
wrote:

> Congratulations, Lincoln!
>
> Best,
> Ferenc
>
>
>
>
> On Friday, April 12th, 2024 at 15:54, lorenzo.affe...@ververica.com.INVALID
>  wrote:
>
> >
> >
> > Huge congrats! Well done!
> > On Apr 12, 2024 at 13:56 +0200, Ron liu ron9@gmail.com, wrote:
> >
> > > Congratulations, Lincoln!
> > >
> > > Best,
> > > Ron
> > >
> > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> > >
> > > > Congratulations, Lincoln!
> > > >
> > > > Best,
> > > > Junrui
> > > >
> > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:29写道:
> > > >
> > > > > > Congratulations, Lincoln!
> > > > > >
> > > > > > Best Regards
> > > > > > Aleksandr
>


Re: [VOTE] FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI

2024-04-11 Thread Zakelly Lan
+1 non-binding


Best,
Zakelly

On Fri, Apr 12, 2024 at 11:05 AM Yuepeng Pan  wrote:

> Hi Rui,
>
> Thanks for driving it!
> +1  (non-binding)
> Best,
> Yuepeng Pan
>
> At 2024-04-12 10:31:19, "Yanfei Lei"  wrote:
> >Hi Rui,
> >
> >Thanks for driving it!
> >
> >+1  (binding)
> >
> >Hangxiang Yu  于2024年4月12日周五 10:26写道:
> >>
> >> +1  (binding)
> >>
> >> On Fri, Apr 12, 2024 at 10:22 AM Jinzhong Li 
> >> wrote:
> >>
> >> > +1  (non binding)
> >> >
> >> > Bests,
> >> > Jinzhong
> >> >
> >> > On Thu, Apr 11, 2024 at 7:26 AM Muhammet Orazov
> >> >  wrote:
> >> >
> >> > > Hey Rui,
> >> > >
> >> > > +1 (non-binding).
> >> > >
> >> > > Thanks for driving it!
> >> > >
> >> > > Best,
> >> > > Muhammet
> >> > >
> >> > > On 2024-04-10 04:36, Rui Fan wrote:
> >> > > > Hi devs,
> >> > > >
> >> > > > Thank you to everyone for the feedback on FLIP-441: Show
> >> > > > the JobType and remove Execution Mode on Flink WebUI[1]
> >> > > > which has been discussed in this thread [2].
> >> > > >
> >> > > > I would like to start a vote for it. The vote will be open for at
> least
> >> > > > 72
> >> > > > hours unless there is an objection or not enough votes.
> >> > > >
> >> > > > [1] https://cwiki.apache.org/confluence/x/agrPEQ
> >> > > > [2]
> https://lists.apache.org/thread/0s52w17w24x7m2zo6ogl18t1fy412vcd
> >> > > >
> >> > > > Best,
> >> > > > Rui
> >> > >
> >> >
> >>
> >>
> >> --
> >> Best,
> >> Hangxiang.
> >
> >
> >
> >--
> >Best,
> >Yanfei
>


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-09 Thread Zakelly Lan
Thanks Xuannan for driving this! +1 for cleaning these up.

And minor comments: It seems the StateBackendOptions is already annotated
with @PublicEvolving.


Best,
Zakelly


On Tue, Apr 9, 2024 at 4:21 PM Xuannan Su  wrote:

> Hi all,
>
> I'd like to start a discussion on FLIP-442: General Improvement to
> Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> provide users with a better experience with the existing
> configuration. This FLIP proposes several general improvements to the
> current configuration.
>
> Looking forward to everyone's feedback and suggestions. Thank you!
>
> Best regards,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>


Re: [DISCUSS] FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI

2024-04-09 Thread Zakelly Lan
Thanks Rui for driving this! +1 for this idea.


Best,
Zakelly

On Mon, Apr 8, 2024 at 7:17 PM Ahmed Hamdy  wrote:

> Acknowledged, +1 to start a vote.
> Best Regards
> Ahmed Hamdy
>
>
> On Mon, 8 Apr 2024 at 12:04, Rui Fan <1996fan...@gmail.com> wrote:
>
> > Sorry, it's a typo. It should be FLINK-32558[1].
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-32558
> >
> > Best,
> > Rui
> >
> > On Mon, Apr 8, 2024 at 6:44 PM Ahmed Hamdy  wrote:
> >
> > > Hi Rui,
> > > Thanks for the proposal.
> > > Is the deprecation Jira mentioned (FLINK-32258) correct?
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Sun, 7 Apr 2024 at 03:37, Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > > If there are no extra comments, I will start voting in three days,
> > thank
> > > > you~
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Thu, Mar 28, 2024 at 4:46 PM Muhammet Orazov
> > > >  wrote:
> > > >
> > > > > Hey Rui,
> > > > >
> > > > > Thanks for the detailed explanation and updating the FLIP!
> > > > >
> > > > > It is much clearer definitely, thanks for the proposal.
> > > > >
> > > > > Best,
> > > > > Muhammet
> > > > >
> > > > > On 2024-03-28 07:37, Rui Fan wrote:
> > > > > > Hi Muhammet,
> > > > > >
> > > > > > Thanks for your reply!
> > > > > >
> > > > > >> The execution mode is also used for the DataStream API [1],
> > > > > >> would that also affect/hide the DataStream execution mode
> > > > > >> if we remove it from the WebUI?
> > > > > >
> > > > > > Sorry, I didn't describe it clearly in FLIP-441[2], I have
> updated
> > > it.
> > > > > > Let me clarify the Execution Mode here:
> > > > > >
> > > > > > 1. Flink 1.19 website[3] also mentions the Execution mode, but it
> > > > > > actually matches the JobType[4] in the Flink code. Both of them
> > > > > > have 2 types: STREAMING and BATCH.
> > > > > > 2. execution.runtime-mode can be set to 3 types: STREAMING,
> > > > > > BATCH and AUTOMATIC. But the jobType will be inferred as
> > > > > > STREAMING or BATCH when execution.runtime-mode is set
> > > > > > to AUTOMATIC.
> > > > > > 3. The ExecutionMode I describe is: code link[5] , as we can
> > > > > > see, ExecutionMode has 4 enums: PIPELINED,
> > > > > > PIPELINED_FORCED, BATCH and BATCH_FORCED.
> > > > > > And we can see a flink streaming job from Flink WebUI,
> > > > > > the Execution mode is PIPELINE instead of STREAMING.
> > > > > > I attached a screenshot to the FLIP doc[2], you can see it there.
> > > > > > 4. What this proposal wants to do is to remove the ExecutionMode
> > > > > > with four enumerations on Flink WebUI and introduce the
> > > > > > JobType with two enumerations (STREAMING or BATCH).
> > > > > > STREAMING or BATCH is clearer and more accurate for users.
> > > > > >
> > > > > > Please let me know if it's not clear or anything is wrong,
> thanks a
> > > > > > lot!
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/execution_mode/
> > > > > > [2] https://cwiki.apache.org/confluence/x/agrPEQ
> > > > > > [3]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution_mode/
> > > > > > [4]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f31c128bfc457b64dd7734f71123b74faa2958ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java#L22
> > > > > > [5]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f31c128bfc457b64dd7734f71123b74faa2958ba/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java#L54
> > > > > >
> > > > > > Best,
> > > > > > Rui
> > > > > >
> > > > > > On Thu, Mar 28, 2024 at 1:33 AM Venkatakrishnan Sowrirajan
> > > > > > 
> > > > > > wrote:
> > > > > >
> > > > > >> Rui,
> > > > > >>
> > > > > >> I assume the current proposal would also handle the case of
> mixed
> > > mode
> > > > > >> (BATCH + STREAMING within the same app) in the future, right?
> > > > > >>
> > > > > >> Regards
> > > > > >> Venkat
> > > > > >>
> > > > > >> On Wed, Mar 27, 2024 at 10:15 AM Venkatakrishnan Sowrirajan <
> > > > > >> vsowr...@asu.edu> wrote:
> > > > > >>
> > > > > >>> This will be a very useful addition to Flink UI. Thanks Rui for
> > > > > >>> starting
> > > > > >>> a FLIP for this improvement.
> > > > > >>>
> > > > > >>> Regards
> > > > > >>> Venkata krishnan
> > > > > >>>
> > > > > >>>
> > > > > >>> On Wed, Mar 27, 2024 at 4:49 AM Muhammet Orazov
> > > > > >>>  wrote:
> > > > > >>>
> > > > >  Hello Rui,
> > > > > 
> > > > >  Thanks for the proposal! It looks good!
> > > > > 
> > > > >  I have minor clarification from my side:
> > > > > 
> > > > >  The execution mode is also used for the DataStream API [1],
> > > > >  would that also affect/hide the DataStream execution mode
> > > > >  if we remove it from the WebUI?
> > > > > 
> > > > >  Best,
> > > > >  Muhammet
> > > > > 

[jira] [Created] (FLINK-35060) Provide compatibility of old CheckpointMode for connector testing framework

2024-04-09 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35060:
---

 Summary: Provide compatibility of old CheckpointMode for connector 
testing framework
 Key: FLINK-35060
 URL: https://issues.apache.org/jira/browse/FLINK-35060
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Tests
Reporter: Zakelly Lan
Assignee: Zakelly Lan


After FLINK-34516, the {{org.apache.flink.streaming.api.CheckpointingMode}} has 
been moved to {{org.apache.flink.core.execution.CheckpointingMode}}. It 
introduced a breaking change to connector testing framework as well as to 
externalized connector repos by mistake. This should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34986) Basic framework for async execution of state

2024-04-01 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34986:
---

 Summary: Basic framework for async execution of state
 Key: FLINK-34986
 URL: https://issues.apache.org/jira/browse/FLINK-34986
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34978) Introduce Asynchronous State APIs

2024-03-31 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34978:
---

 Summary: Introduce Asynchronous State APIs
 Key: FLINK-34978
 URL: https://issues.apache.org/jira/browse/FLINK-34978
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34979) Implement State Future and related utilities

2024-03-31 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34979:
---

 Summary: Implement State Future and related utilities
 Key: FLINK-34979
 URL: https://issues.apache.org/jira/browse/FLINK-34979
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34974) FLIP-424: Asynchronous State APIs

2024-03-31 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34974:
---

 Summary: FLIP-424: Asynchronous State APIs
 Key: FLINK-34974
 URL: https://issues.apache.org/jira/browse/FLINK-34974
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / State Backends
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 2.0.0


This is a sub-FLIP for the disaggregated state management and its related work, 
please read the [FLIP-423|https://cwiki.apache.org/confluence/x/R4p3EQ] first 
to know the whole story.

To maximize I/O capacity utilization and enhance the use of pre-allocated 
computational resources, this FLIP proposes the introduction of asynchronous 
state APIs. These APIs permit state access to be executed in threads separate 
from the task thread, returning the result when available. Consequently, the 
task thread can process another element while awaiting multiple pending state 
results. This enables concurrent processing of multiple records, ensuring that 
the latency of individual I/O operations no longer has a direct impact on job 
performance. This approach is particularly advantageous in scenarios where I/O 
bandwidth is underutilized and I/O latency is the limiting factor. The 
Disaggregated Storage Architecture, as discussed in FLIP-423, is a prime 
example of a scenario characterized by abundant and easily scalable I/O 
bandwidth coupled with higher I/O latency. The asynchronous state APIs hold 
great promise for significantly enhancing Flink's performance when dealing with 
disaggregated state.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34973) FLIP-423: Disaggregated State Storage and Management

2024-03-31 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34973:
---

 Summary: FLIP-423: Disaggregated State Storage and Management
 Key: FLINK-34973
 URL: https://issues.apache.org/jira/browse/FLINK-34973
 Project: Flink
  Issue Type: New Feature
Reporter: Zakelly Lan
 Fix For: 2.0.0


The past decade has witnessed a dramatic shift in Flink's deployment mode, 
workload patterns, and hardware improvements. We've moved from the map-reduce 
era where workers are computation-storage tightly coupled nodes to a 
cloud-native world where containerized deployments on Kubernetes become 
standard. To enable Flink's Cloud-Native future, we introduce Disaggregated 
State Storage and Management that uses DFS as primary storage in Flink 2.0, as 
promised in the Flink 2.0 Roadmap.

Detailed design and story: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855

Also sub-FLIPs:
- Asynchronous State APIs 
([FLIP-424|https://cwiki.apache.org/confluence/x/SYp3EQ]): Introduce new APIs 
for asynchronous state access. 
- Asynchronous Execution Model 
([FLIP-425|https://cwiki.apache.org/confluence/x/S4p3EQ]): Implement a 
non-blocking execution model leveraging the asynchronous APIs introduced in 
FLIP-424. 
- Grouping Remote State Access 
([FLIP-426|https://cwiki.apache.org/confluence/x/TYp3EQ]): Enable retrieval of 
remote state data in batches to avoid unnecessary round-trip costs for remote 
access. 
- Disaggregated State Store 
([FLIP-427|https://cwiki.apache.org/confluence/x/T4p3EQ]): Introduce the 
initial version of the ForSt disaggregated state store.
- Fault Tolerance/Rescale Integration 
([FLIP-428|https://cwiki.apache.org/confluence/x/UYp3EQ]): Integrate 
checkpointing mechanisms with the disaggregated state store for fault tolerance 
and fast rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-424: Asynchronous State APIs

2024-03-31 Thread Zakelly Lan
Hi devs,

I'm happy to announce that FLIP-424: Asynchronous State APIs [1] has been
accepted with 11 approving votes, 7 of which are binding [2]:

- Xuannan Su
- Yuan Mei (binding)
- Piotr Nowojski (binding)
- Jing Ge (binding)
- Feifan Wang
- Rui Fan (binding)
- Yunfeng Zhou
- Yuepeng Pan
- Xintong Song (binding)
- Roman Khachatryan (binding)
- Yun Tang (binding)

[1] https://cwiki.apache.org/confluence/x/SYp3EQ
[2] https://lists.apache.org/thread/tplyf17n3409l605bo9promf4o8tvl2j


Best,
Zakelly


Re: [VOTE] FLIP-424: Asynchronous State APIs

2024-03-31 Thread Zakelly Lan
Thank you all for the votes! I'm closing this thread and the result will be
posted in a separate mail.


Best,
Zakelly

On Sat, Mar 30, 2024 at 4:43 PM Roman Khachatryan  wrote:

> +1 (binding)
>
> Regards,
> Roman
>
>
> On Fri, Mar 29, 2024 at 7:01 AM Xintong Song 
> wrote:
>
> > +1 (binding)
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, Mar 29, 2024 at 12:51 PM Yuepeng Pan 
> > wrote:
> >
> > > +1(non-binding)
> > >
> > > Best,
> > > Yuepeng Pan
> > >
> > >
> > > On 2024/03/29 03:03:53 Yunfeng Zhou wrote:
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Yunfeng
> > > >
> > > > On Wed, Mar 27, 2024 at 6:23 PM Zakelly Lan 
> > > wrote:
> > > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a vote on the FLIP-424: Asynchronous State APIs
> > [1].
> > > The
> > > > > discussion thread is here [2].
> > > > >
> > > > > The vote will be open for at least 72 hours unless there is an
> > > objection or
> > > > > insufficient votes.
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > [2]
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-29 Thread Zakelly Lan
Hi Yun,

We can easily obtain whether it is empty or not when creating an iterator
by the way, so this is synchronous. Docs added.


Thanks & Best,
Zakelly

On Sat, Mar 30, 2024 at 11:01 AM Yun Tang  wrote:

> Hi Zakelly,
>
> I just have one minor question, why the interface of StateIterator#isEmpty
> is not asynchronous? Moreover, it lacks docs.
>
>
> Best
> Yun Tang
> 
> From: Jane Chan 
> Sent: Tuesday, March 19, 2024 21:54
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-424: Asynchronous State APIs
>
> Hi Zakelly,
>
> Thanks for your clarification. I'm +1 for using `onNext`.
>
> Best,
> Jane
>
> On Tue, Mar 19, 2024 at 6:38 PM Zakelly Lan  wrote:
>
> > Hi Jane,
> >
> > Thanks for your comments!
> >
> > I guess there is no problem with the word 'on' in this scenario, since it
> > is an event-driven-like execution model. I think the word 'hasNext' may
> be
> > misleading since there is a 'hasNext' in a typical iterator which
> returns a
> > boolean for the existence of a next element. I think the GPT may also be
> > misled by this word, and misunderstood the meaning of this interface and
> > therefore giving the wrong suggestions :) . Actually this interface is
> > introduced to iterating elements (like next()) instead of checking the
> > existence. I think the name `onNext()` is more suitable, WDYT?  Or to be
> > more explicit, we can add 'Compose' or 'Apply' to interfaces
> > (onNextCompose, onNextAccept) matching the functions of corresponding
> APIs
> > from 'StateFuture'. WDYT? But I'd prefer the former since it is more
> > simple.
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Mar 19, 2024 at 6:09 PM Jane Chan  wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for bringing this discussion.
> > >
> > > I'm +1 for the overall API design, except for one minor comment about
> the
> > > name of StateIterator#onHasNext since I feel it is a little bit
> > > unintuitive. Meanwhile, I asked the opinion from GPT, here's what it
> said
> > >
> > > The prefix "on" is commonly used in event-driven programming to denote
> an
> > > > event handler, not to check a condition. For instance, in JavaScript,
> > you
> > > > might have onClick to handle click events. Therefore, using "on" may
> be
> > > > misleading if the method is being used to check for the existence of
> a
> > > next
> > > > element.
> > >
> > > For an async iterator, you'd want a name that clearly conveys that the
> > > > method will check for the next item asynchronously and return a
> promise
> > > or
> > > > some form of future result. In JavaScript, which supports async
> > > iteration,
> > > > the standard method for this is next(), which when used with async
> > > > iterators, returns a promise that resolves to an object with
> properties
> > > > value and done.
> > >
> > > Here are a couple of better alternatives:
> > >
> > > hasNextAsync: This name clearly states that the function is an
> > asynchronous
> > > > version of the typical hasNext method found in synchronous iterators.
> > > > nextExists: This name suggests the method checks for the existence
> of a
> > > > next item, without the potential confusion of event handler naming
> > > > conventions.
> > > >
> > >
> > > WDYT?
> > >
> > > Best,
> > > Jane
> > >
> > > On Tue, Mar 19, 2024 at 5:47 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks for your valuable feedback!
> > > >
> > > > The discussions were vibrant and have led to significant enhancements
> > to
> > > > this FLIP. With this progress, I'm looking to initiate the voting in
> 72
> > > > hours.
> > > >
> > > > Please let me know if you have any concerns, thanks!
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Tue, Mar 19, 2024 at 5:35 PM Zakelly Lan 
> > > wrote:
> > > >
> > > > > Hi Yue,
> > > > >
> > > > > Thanks for your comments!
> > > > >
> > > > > 1. Is it possible for all `FutureUtils` in Flink to reuse the same
> > util
> > > > >> class?
> > > > 

Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Zakelly Lan
Congratulations!


Best,
Zakelly

On Thu, Mar 28, 2024 at 10:13 PM Jing Ge  wrote:

> Congrats!
>
> Best regards,
> Jing
>
> On Thu, Mar 28, 2024 at 1:27 PM Feifan Wang  wrote:
>
> > Congratulations!——
> >
> > Best regards,
> >
> > Feifan Wang
> >
> >
> >
> >
> > At 2024-03-28 20:02:43, "Yanfei Lei"  wrote:
> > >Congratulations!
> > >
> > >Best,
> > >Yanfei
> > >
> > >Zhanghao Chen  于2024年3月28日周四 19:59写道:
> > >>
> > >> Congratulations!
> > >>
> > >> Best,
> > >> Zhanghao Chen
> > >> 
> > >> From: Yu Li 
> > >> Sent: Thursday, March 28, 2024 15:55
> > >> To: d...@paimon.apache.org 
> > >> Cc: dev ; user 
> > >> Subject: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level
> Project
> > >>
> > >> CC the Flink user and dev mailing list.
> > >>
> > >> Paimon originated within the Flink community, initially known as Flink
> > >> Table Store, and all our incubating mentors are members of the Flink
> > >> Project Management Committee. I am confident that the bonds of
> > >> enduring friendship and close collaboration will continue to unite the
> > >> two communities.
> > >>
> > >> And congratulations all!
> > >>
> > >> Best Regards,
> > >> Yu
> > >>
> > >> On Wed, 27 Mar 2024 at 20:35, Guojun Li 
> > wrote:
> > >> >
> > >> > Congratulations!
> > >> >
> > >> > Best,
> > >> > Guojun
> > >> >
> > >> > On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
> > >> >
> > >> > > Congratulations~
> > >> > >
> > >> > > > 2024年3月27日 15:54,王刚  写道:
> > >> > > >
> > >> > > > Congratulations~
> > >> > > >
> > >> > > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > >> > > >>
> > >> > > >> Hi Paimon community,
> > >> > > >>
> > >> > > >> I’m glad to announce that the ASF board has approved a
> > resolution to
> > >> > > >> graduate Paimon into a full Top Level Project. Thanks to
> > everyone for
> > >> > > >> your help to get to this point.
> > >> > > >>
> > >> > > >> I just created an issue to track the things we need to modify
> > [2],
> > >> > > >> please comment on it if you feel that something is missing. You
> > can
> > >> > > >> refer to apache documentation [1] too.
> > >> > > >>
> > >> > > >> And, we already completed the GitHub repo migration [3], please
> > update
> > >> > > >> your local git repo to track the new repo [4].
> > >> > > >>
> > >> > > >> You can run the following command to complete the remote repo
> > tracking
> > >> > > >> migration.
> > >> > > >>
> > >> > > >> git remote set-url origin https://github.com/apache/paimon.git
> > >> > > >>
> > >> > > >> If you have a different name, please change the 'origin' to
> your
> > remote
> > >> > > name.
> > >> > > >>
> > >> > > >> Please join me in celebrating!
> > >> > > >>
> > >> > > >> [1]
> > >> > >
> >
> https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > >> > > >> [2] https://github.com/apache/paimon/issues/3091
> > >> > > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > >> > > >> [4] https://github.com/apache/paimon
> > >> > > >>
> > >> > > >> Best,
> > >> > > >> Jingsong Lee
> > >> > >
> > >> > >
> >
>


[VOTE] FLIP-424: Asynchronous State APIs

2024-03-27 Thread Zakelly Lan
Hi devs,

I'd like to start a vote on the FLIP-424: Asynchronous State APIs [1]. The
discussion thread is here [2].

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

[1] https://cwiki.apache.org/confluence/x/SYp3EQ
[2] https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864


Best,
Zakelly


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-27 Thread Zakelly Lan
Thank Piotrek for your valuable input!

I will prepare the following FLIPs about faster checkpointing in the
current async execution model and the new APIs. And I have added some brief
description of this part in FLIP-423/424/425.

Regarding your concern:

>  My main concern here, is to prevent a situation where we

end up with duplicate code base of the operators:

- the current set of operators that are well behaving during checkpointing,
> but are synchronous
> - some set of async operators that will be miss-behaving during checkpoints
>

Yes, that's definitely what we should avoid. Let's thoroughly refine the
checkpointing behavior before the SQL operator rework in milestone 2.


Best,
Zakelly

On Wed, Mar 27, 2024 at 4:30 PM Piotr Nowojski  wrote:

> Hi!
>
> Yes, after some long offline discussions we agreed to proceed as planned
> here, but we should treat the current API as experimental. The issues are
> that either we can not checkpoint lambdas as they are currently defined,
> leading to problems caused by in-flight records draining under
> backpressure.
>
> 
> 6000 records that's not far off from the amount of records buffered in
> network buffers for smaller parallelism back pressured jobs, and before
> unaligned checkpoints were implemented, even such jobs were often seen with
> checkpointing times exploding to tens of minutes/hours.
> And special handling of watermarks wouldn't solve the problem - problem
> might be caused by upstream window operator flushing records on watermark,
> while downstream operator that has those 6000 in-flight records is also
> backpressured. Then processing of the watermark in the upstream operator
> will be tied to the downstream operator draining all of those in-flight
> records.
> 
>
> On the other hand checkpointing in-flight requests with the current API
> would require serializing lambdas into the checkpointed state, which has
> significant limitations on it's own:
> - unable to upgrade (including bug fix) JRE
> - problems when updating dependencies of the code inside the lambdas, if
> the updated dependencies are not binary compatible
> - no way to fix bugs inside the lambdas - users might get stuck in an
> unrecoverable state
>
> After brainstorming couple of different options, we came up with a couple
> of solutions to those issues, one that we liked the most looks like:
>
> ```
> public void open() {
> // name (bid),  in type, consumer
> consumer("GET", Void.class, (v) -> {
> return getState("wordcount").get();
> });
> consumer("UPDATE", Integer.class, (v) -> {
> return getState("wordcount").update(v == null ? 1 : v + 1);
> });
> consumer("OUT", Integer.class, (v) -> {
> getContext().collect(v);
> });
> }
> public void processElement() {
> do("GET").then("UPDATE").then("OUT");
> }
> ```
>
> Where "GET" "UPDATE" "OUT" are some uid's (`block-id`/`bid`). This way
> users could declare upfront during the operator's startup what
> method/function should handle a given `bid` in the current execution
> attempt. When checkpointing in-flight async state requests, Flink would
> store only the registered `bid`, not the serialised code itself. This would
> avoid problems of serializing lambdas.
>
> However, to not postpone this effort we reached a consensus that we can
> proceed with the current proposal and treat the currently proposed Async
> API (without declaring code upfront) as experimental/PoC - a test bed for
> the whole disaggregated state backend. Not intended to be widely used in
> the code base. And in parallel in a follow up FLIP, we could discuss the
> exact shape of the declarative async API that would be actually
> checkpointable. My main concern here, is to prevent a situation where we
> end up with duplicate code base of the operators:
> - the current set of operators that are well behaving during checkpointing,
> but are synchronous
> - some set of async operators that will be miss-behaving during checkpoints
>
> We should really try to avoid this scenario.
>
> Best,
> Piotrek
>
> śr., 27 mar 2024 o 05:06 Zakelly Lan  napisał(a):
>
> > Hi devs,
> >
> > It seems there is no more concern or suggestion for a while. We'd like to
> > start voting recently.
> >
> >
> > Best,
> > Zakelly
> >
> > On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Piotr and I had a long discussion about the checkpoint duration issue.
> We
> > > think that the lambda serialization approach I proposed in las

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-26 Thread Zakelly Lan
Hi devs,

It seems there is no more concern or suggestion for a while. We'd like to
start voting recently.


Best,
Zakelly

On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan  wrote:

> Hi everyone,
>
> Piotr and I had a long discussion about the checkpoint duration issue. We
> think that the lambda serialization approach I proposed in last mail may
> bring in more issues, the most serious one is that users may not be able to
> modify their code in serialized lambda to perform a bug fix.
>
> But fortunately we found a possible solution. By introducing a set of
> declarative APIs and a `declareProcess()` function that users should
> implement in some newly introduced AbstractStreamOperator, we could get the
> declaration of record processing in runtime, broken down to requests and
> callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of
> lambda (de)serialization and instead we retrieve callbacks every time
> before a task runs. The next step is to provide an API allowing users to
> assign an unique id to each state request and callback, or automatically
> assign one by declaration order. Thus we can find the corresponding
> callback in runtime for each restored state request based on the id, then
> the whole pipeline can be resumed.
>
> Note that all these above are internal and won't expose to average users.
> Exposing this on Stream APIs can be discussed later. I will prepare another
> FLIP(s) describing the whole optimized checkpointing process, and in the
> meantime, we can proceed on current FLIPs. The new FLIP(s) are built on top
> of current ones and we can achieve this incrementally.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan 
> wrote:
>
>> Hi Piotrek,
>>
>> Thanks for your comments!
>>
>> As we discussed off-line, you agreed that we can not checkpoint while some
>>> records are in the middle of being
>>> processed. That we would have to drain the in-progress records before
>>> doing
>>> the checkpoint. You also argued
>>> that this is not a problem, because the size of this buffer can be
>>> configured.
>>>
>>> I'm really afraid of such a solution. I've seen in the past plenty of
>>> times, that whenever Flink has to drain some
>>> buffered records, eventually that always brakes timely checkpointing (and
>>> hence ability for Flink to rescale in
>>> a timely manner). Even a single record with a `flatMap` like operator
>>> currently in Flink causes problems during
>>> back pressure. That's especially true for example for processing
>>> watermarks. At the same time, I don't see how
>>> this value could be configured by even Flink's power users, let alone an
>>> average user. The size of that in-flight
>>> buffer not only depends on a particular query/job, but also the "good"
>>> value changes dynamically over time,
>>> and can change very rapidly. Sudden spikes of records or backpressure,
>>> some
>>> hiccup during emitting watermarks,
>>> all of those could change in an instant the theoretically optimal buffer
>>> size of let's say "6000" records, down to "1".
>>> And when those changes happen, those are the exact times when timely
>>> checkpointing matters the most.
>>> If the load pattern suddenly changes, and checkpointing takes suddenly
>>> tens
>>> of minutes instead of a couple of
>>> seconds, it means you can not use rescaling and you are forced to
>>> overprovision the resources. And there also
>>> other issues if checkpointing takes too long.
>>>
>>
>> I'm gonna clarify some misunderstanding here. First of all, is the sync
>> phase of checkpointing for the current plan longer than the synchronous
>> execution model? The answer is yes, it is a trade-off for parallel
>> execution model. I think the cost is worth the improvement. Now the
>> question is, how much longer are we talking about? The PoC result I
>> provided is that it takes 3 seconds to drain 6000 records of a simple job,
>> and I said it is not a big deal. Even though you would say we may encounter
>> long watermark/timer processing that make the cp wait, thus I provide
>> several ways to optimize this:
>>
>>1. Instead of only controlling the in-flight records, we could
>>control the in-flight watermark.
>>2. Since we have broken down the record processing into several state
>>requests with at most one subsequent callback for each request, the cp can
>>be processed after current RUNNING requests (NOT records) and their
>>

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-26 Thread Zakelly Lan
Hi everyone,

Piotr and I had a long discussion about the checkpoint duration issue. We
think that the lambda serialization approach I proposed in last mail may
bring in more issues, the most serious one is that users may not be able to
modify their code in serialized lambda to perform a bug fix.

But fortunately we found a possible solution. By introducing a set of
declarative APIs and a `declareProcess()` function that users should
implement in some newly introduced AbstractStreamOperator, we could get the
declaration of record processing in runtime, broken down to requests and
callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of
lambda (de)serialization and instead we retrieve callbacks every time
before a task runs. The next step is to provide an API allowing users to
assign an unique id to each state request and callback, or automatically
assign one by declaration order. Thus we can find the corresponding
callback in runtime for each restored state request based on the id, then
the whole pipeline can be resumed.

Note that all these above are internal and won't expose to average users.
Exposing this on Stream APIs can be discussed later. I will prepare another
FLIP(s) describing the whole optimized checkpointing process, and in the
meantime, we can proceed on current FLIPs. The new FLIP(s) are built on top
of current ones and we can achieve this incrementally.


Best,
Zakelly

On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan  wrote:

> Hi Piotrek,
>
> Thanks for your comments!
>
> As we discussed off-line, you agreed that we can not checkpoint while some
>> records are in the middle of being
>> processed. That we would have to drain the in-progress records before
>> doing
>> the checkpoint. You also argued
>> that this is not a problem, because the size of this buffer can be
>> configured.
>>
>> I'm really afraid of such a solution. I've seen in the past plenty of
>> times, that whenever Flink has to drain some
>> buffered records, eventually that always brakes timely checkpointing (and
>> hence ability for Flink to rescale in
>> a timely manner). Even a single record with a `flatMap` like operator
>> currently in Flink causes problems during
>> back pressure. That's especially true for example for processing
>> watermarks. At the same time, I don't see how
>> this value could be configured by even Flink's power users, let alone an
>> average user. The size of that in-flight
>> buffer not only depends on a particular query/job, but also the "good"
>> value changes dynamically over time,
>> and can change very rapidly. Sudden spikes of records or backpressure,
>> some
>> hiccup during emitting watermarks,
>> all of those could change in an instant the theoretically optimal buffer
>> size of let's say "6000" records, down to "1".
>> And when those changes happen, those are the exact times when timely
>> checkpointing matters the most.
>> If the load pattern suddenly changes, and checkpointing takes suddenly
>> tens
>> of minutes instead of a couple of
>> seconds, it means you can not use rescaling and you are forced to
>> overprovision the resources. And there also
>> other issues if checkpointing takes too long.
>>
>
> I'm gonna clarify some misunderstanding here. First of all, is the sync
> phase of checkpointing for the current plan longer than the synchronous
> execution model? The answer is yes, it is a trade-off for parallel
> execution model. I think the cost is worth the improvement. Now the
> question is, how much longer are we talking about? The PoC result I
> provided is that it takes 3 seconds to drain 6000 records of a simple job,
> and I said it is not a big deal. Even though you would say we may encounter
> long watermark/timer processing that make the cp wait, thus I provide
> several ways to optimize this:
>
>1. Instead of only controlling the in-flight records, we could control
>the in-flight watermark.
>2. Since we have broken down the record processing into several state
>requests with at most one subsequent callback for each request, the cp can
>be processed after current RUNNING requests (NOT records) and their
>callbacks finish. Which means, even though we have a lot of records
>in-flight (I mean in 'processElement' here), once only a small group of
>state requests finishes, the cp can proceed. They will form into 1 or 2
>multiGets to rocksdb, which takes less time. Moreover, the timer processing
>is also split into several requests, so cp won't wait for the whole timer
>to finish. The picture attached describes this idea.
>
> And the size of this buffer can be configured. I'm not counting on average
> 

Re: [DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-24 Thread Zakelly Lan
Hi Lorenzo,

Thanks for your comments!

I think you got my question, and I did not realize that is not even allowed
> to modify some externally scoped variable in a lambda.
> I guess the point is that it is possible, but the user would really need
> to be willing to do it and "shoot him/herself in the foot".
>
Well, I think what Yanfei means is you cannot capture local variables which
are not final or effectively final. It is not allowed in Java and the
compiler as well as the IDE will report this error.
Users can use some final containers to avoid this, with the lambda
capturing the final container and modifying the internal value of it. e.g.

```
final AtomicInteger x = new AtomicInteger(0);

void processElement(...) {
   state.value().then( val -> { x.addAndGet(val); } );
   ...
   out.collect(x.get());
}
```

For example, If the user now would not be sure that elements end up being
> in correct windows, I am afraid this would somewhat simply hinder the
> watermark concept as a whole. What do you think?
>
Well, I'm afraid there is no problem like this. Typically, the element will
be assigned to a window based on its event time, and they will only
manipulate the state associated with the related window. So as long as we
ensure that the window fires after corresponding elements processing, the
behavior is correct. Whether the elements for the latter window are
processed before or after former window fires  does not matter. Well this
is true for most cases, except for the scenarios where the state is not
divided into slices for different windows, while we haven't seen a concrete
use case for it. That's where the discussion between Xintong and me comes
from[1].


Hope these answer your questions.

[1] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz

Best,
Zakelly


On Fri, Mar 22, 2024 at 11:11 PM 
wrote:

> Thank you Yanfei for addressing all the questions!
>
> > I'm not sure if I understand your question. In Java, this
> case(modifying the local local variable) is not allowed[1], but there
> are ways to get around the limitation of lambda.
> In this case, the modification to x may be concurrent, which needs to
> be handled carefully.
>
> I think you got my question, and I did not realize that is not even
> allowed to modify some externally scoped variable in a lambda.
> I guess the point is that it is possible, but the user would really need
> to be willing to do it and "shoot him/herself in the foot".
>
> > an implicit fact in sync
> API is that "event timer fire" would execute before "the subsequent
> records of watermark", but in out-of-order mode(async API), the
> execution order between them is not guaranteed
>
> Got it, what I don't get exactly is what type of inconsistency/issue the
> user could face.
> For example, If the user now would not be sure that elements end up being
> in correct windows, I am afraid this would somewhat simply hinder the
> watermark concept as a whole. What do you think?
>
> Thank you.
>
> On Mar 21, 2024 at 14:27 +0100, Yanfei Lei , wrote:
> > Thanks for your reading and valuable comments!
> >
> > > 1) About locking VS reference counting: I would like to clear out
> which mechanism prevents what:
> > The `KeyAccountingUnit` implements locking behavior on keys and
> > ensures 2 state requests on the same key happen in order.
> > Double-locking the same key does not result in deadlocks (thanks to
> > the `previous == record` condition in your pseudo-code), so, the same
> > callback chain can update/read multiple times the same piece of state.
> > On the other side we have the reference counting mechanism that is
> > used to understand whether a record has been fully processed, i.e.,
> > all state invocations have been carried out.
> > Here is the question: am I correct if we say that key accounting is
> > needed for out-of-order while reference counting is needed for
> > checkpointing and watermarking?
> >
> >
> > Regarding the "deadlock" of `KeyAccountingUnit`: good catch, we will
> > emphasize this in FLIP, the KeyAccountingUnitis reentrant, so the
> > state requests of the same record can update/read multiple times
> > without deadlock.
> >
> > Regarding the question: records, checkpoint barriers and watermarks
> > can be regarded as inputs, this FLIP discusses the *order* between all
> > inputs, in simple terms, the inputs of the same key that arrive first
> > need to be executed first.
> >
> > And the `KeyAccountingUnit` and reference counting work together to
> > preserve the order, when the reference counting mechanism recognizes a
> > record has been fully processed, the record will be removed from the
> > `KeyAccountingUnit`. The checkpoint or watermark would start util all
> > the reference counting of arrived inputs reach zero.
> >
> >
> > > 2) Number of mails:
> > Do you end up having two mails?
> >
> >
> > Yes, there are two mails in this case.
> >
> > > 3) Would this change something on the consistency guarantees provided?
> > I guess not, 

Re: [DISCUSS] FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

2024-03-22 Thread Zakelly Lan
Hi Yue,

Thanks for bringing this up!

The CURRENT FILE is the special one, which should be snapshot during the
sync phase (temporary load into memory). Thus we can solve this.


Best,
Zakelly

On Fri, Mar 22, 2024 at 4:55 PM yue ma  wrote:

> Hi jinzhong,
> Thanks for you reply. I still have some doubts about the first question. Is
> there such a case
> When you made a snapshot during the synchronization phase, you recorded the
> current and manifest 8, but before asynchronous phase, the manifest reached
> the size threshold and then the CURRENT FILE pointed to the new manifest 9,
> and then uploaded the incorrect CURRENT file ?
>
> Jinzhong Li  于2024年3月20日周三 20:13写道:
>
> > Hi Yue,
> >
> > Thanks for your feedback!
> >
> > > 1. If we choose Option-3 for ForSt , how would we handle Manifest File
> > > ? Should we take a snapshot of the Manifest during the synchronization
> > phase?
> >
> > IIUC, the GetLiveFiles() API in Option-3 can also catch the fileInfo of
> > Manifest files, and this api also return the manifest file size, which
> > means this api could take snapshot for Manifest FileInfo (filename +
> > fileSize) during the synchronization phase.
> > You could refer to the rocksdb source code[1] to verify this.
> >
> >
> >  > However, many distributed storage systems do not support the
> > > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > > directly read and write remote files. Can we not copy or Fast duplicate
> > > these files, but instand of directly reuse and. reference these remote
> > > files? I think this can reduce file download time and may be more
> useful
> > > for most users who use HDFS (do not support Fast Duplicate)?
> >
> > Firstly, as far as I know, most remote file systems support the
> > FastDuplicate, eg. S3 copyObject/Azure Blob Storage copyBlob/OSS
> > copyObject, and the HDFS indeed does not support FastDuplicate.
> >
> > Actually,we have considered the design which reuses remote files. And
> that
> > is what we want to implement in the coming future, where both checkpoints
> > and restores can reuse existing files residing on the remote state
> storage.
> > However, this design conflicts with the current file management system in
> > Flink.  At present, remote state files are managed by the ForStDB
> > (TaskManager side), while checkpoint files are managed by the JobManager,
> > which is a major hindrance to file reuse. For example, issues could arise
> > if a TM reuses a checkpoint file that is subsequently deleted by the JM.
> > Therefore, as mentioned in FLIP-423[2], our roadmap is to first integrate
> > checkpoint/restore mechanisms with existing framework  at milestone-1.
> > Then, at milestone-2, we plan to introduce TM State Ownership and Faster
> > Checkpointing mechanisms, which will allow both checkpointing and
> restoring
> > to directly reuse remote files, thus achieving faster checkpointing and
> > restoring.
> >
> > [1]
> >
> >
> https://github.com/facebook/rocksdb/blob/6ddfa5f06140c8d0726b561e16dc6894138bcfa0/db/db_filesnapshot.cc#L77
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-RoadMap+LaunchingPlan
> >
> > Best,
> > Jinzhong
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Mar 20, 2024 at 4:01 PM yue ma  wrote:
> >
> > > Hi Jinzhong
> > >
> > > Thank you for initiating this FLIP.
> > >
> > > I have just some minor question:
> > >
> > > 1. If we choice Option-3 for ForSt , how would we handle Manifest File
> > > ? Should we take snapshot of the Manifest during the synchronization
> > phase?
> > > Otherwise, may the Manifest and MetaInfo information be inconsistent
> > during
> > > recovery?
> > > 2. For the Restore Operation , we need Fast Duplicate  Checkpoint Files
> > to
> > > Working Dir . However, many distributed storage systems do not support
> > the
> > > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > > directly read and write remote files. Can we not copy or Fast duplicate
> > > these files, but instand of directly reuse and. reference these remote
> > > files? I think this can reduce file download time and may be more
> useful
> > > for most users who use HDFS (do not support Fast Duplicate)?
> > >
> > > --
> > > Best,
> > > Yue
> > >
> >
>
>
> --
> Best,
> Yue
>


Re: [VOTE] FLIP-433: State Access on DataStream API V2

2024-03-21 Thread Zakelly Lan
+1 non-binding


Best,
Zakelly

On Thu, Mar 21, 2024 at 5:34 PM Gyula Fóra  wrote:

> +1 (binding)
>
> Gyula
>
> On Thu, Mar 21, 2024 at 3:33 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > +1(binding)
> >
> > Thanks to Weijie for driving this proposal, which solves the problem
> that I
> > raised in FLIP-359.
> >
> > Best,
> > Rui
> >
> > On Thu, Mar 21, 2024 at 10:10 AM Hangxiang Yu 
> wrote:
> >
> > > +1 (binding)
> > >
> > > On Thu, Mar 21, 2024 at 10:04 AM Xintong Song 
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Wed, Mar 20, 2024 at 8:30 PM weijie guo <
> guoweijieres...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > >
> > > > > Thanks for all the feedback about the FLIP-433: State Access on
> > > > > DataStream API V2 [1]. The discussion thread is here [2].
> > > > >
> > > > >
> > > > > The vote will be open for at least 72 hours unless there is an
> > > > > objection or insufficient votes.
> > > > >
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> > > > >
> > > > > [2]
> https://lists.apache.org/thread/8ghzqkvt99p4k7hjqxzwhqny7zb7xrwo
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Hangxiang.
> > >
> >
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Zakelly Lan
Congratulations!


Best,
Zakelly

On Thu, Mar 21, 2024 at 12:05 PM weijie guo 
wrote:

> Congratulations! Well done.
>
>
> Best regards,
>
> Weijie
>
>
> Feng Jin  于2024年3月21日周四 11:40写道:
>
>> Congratulations!
>>
>>
>> Best,
>> Feng
>>
>>
>> On Thu, Mar 21, 2024 at 11:37 AM Ron liu  wrote:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Ron
>> >
>> > Jark Wu  于2024年3月21日周四 10:46写道:
>> >
>> > > Congratulations and welcome!
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:
>> > >
>> > > > Congratulations!
>> > > >
>> > > > Best,
>> > > > Rui
>> > > >
>> > > > On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan 
>> > > wrote:
>> > > >
>> > > > > Congrattulations!
>> > > > >
>> > > > > Best,
>> > > > > Hang
>> > > > >
>> > > > > Lincoln Lee  于2024年3月21日周四 09:54写道:
>> > > > >
>> > > > >>
>> > > > >> Congrats, thanks for the great work!
>> > > > >>
>> > > > >>
>> > > > >> Best,
>> > > > >> Lincoln Lee
>> > > > >>
>> > > > >>
>> > > > >> Peter Huang  于2024年3月20日周三 22:48写道:
>> > > > >>
>> > > > >>> Congratulations
>> > > > >>>
>> > > > >>>
>> > > > >>> Best Regards
>> > > > >>> Peter Huang
>> > > > >>>
>> > > > >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang > >
>> > > > wrote:
>> > > > >>>
>> > > > 
>> > > >  Congratulations
>> > > > 
>> > > > 
>> > > > 
>> > > >  Best,
>> > > >  Huajie Wang
>> > > > 
>> > > > 
>> > > > 
>> > > >  Leonard Xu  于2024年3月20日周三 21:36写道:
>> > > > 
>> > > > > Hi devs and users,
>> > > > >
>> > > > > We are thrilled to announce that the donation of Flink CDC as
>> a
>> > > > > sub-project of Apache Flink has completed. We invite you to
>> > explore
>> > > > the new
>> > > > > resources available:
>> > > > >
>> > > > > - GitHub Repository: https://github.com/apache/flink-cdc
>> > > > > - Flink CDC Documentation:
>> > > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
>> > > > >
>> > > > > After Flink community accepted this donation[1], we have
>> > completed
>> > > > > software copyright signing, code repo migration, code cleanup,
>> > > > website
>> > > > > migration, CI migration and github issues migration etc.
>> > > > > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>> > > > > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
>> > > > contributors
>> > > > > for their contributions and help during this process!
>> > > > >
>> > > > >
>> > > > > For all previous contributors: The contribution process has
>> > > slightly
>> > > > > changed to align with the main Flink project. To report bugs
>> or
>> > > > suggest new
>> > > > > features, please open tickets
>> > > > > Apache Jira (https://issues.apache.org/jira).  Note that we
>> will
>> > > no
>> > > > > longer accept GitHub issues for these purposes.
>> > > > >
>> > > > >
>> > > > > Welcome to explore the new repository and documentation. Your
>> > > > feedback
>> > > > > and contributions are invaluable as we continue to improve
>> Flink
>> > > CDC.
>> > > > >
>> > > > > Thanks everyone for your support and happy exploring Flink
>> CDC!
>> > > > >
>> > > > > Best,
>> > > > > Leonard
>> > > > > [1]
>> > > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-19 Thread Zakelly Lan
Hi Jane,

Thanks for your comments!

I guess there is no problem with the word 'on' in this scenario, since it
is an event-driven-like execution model. I think the word 'hasNext' may be
misleading since there is a 'hasNext' in a typical iterator which returns a
boolean for the existence of a next element. I think the GPT may also be
misled by this word, and misunderstood the meaning of this interface and
therefore giving the wrong suggestions :) . Actually this interface is
introduced to iterating elements (like next()) instead of checking the
existence. I think the name `onNext()` is more suitable, WDYT?  Or to be
more explicit, we can add 'Compose' or 'Apply' to interfaces
(onNextCompose, onNextAccept) matching the functions of corresponding APIs
from 'StateFuture'. WDYT? But I'd prefer the former since it is more simple.


Best,
Zakelly

On Tue, Mar 19, 2024 at 6:09 PM Jane Chan  wrote:

> Hi Zakelly,
>
> Thanks for bringing this discussion.
>
> I'm +1 for the overall API design, except for one minor comment about the
> name of StateIterator#onHasNext since I feel it is a little bit
> unintuitive. Meanwhile, I asked the opinion from GPT, here's what it said
>
> The prefix "on" is commonly used in event-driven programming to denote an
> > event handler, not to check a condition. For instance, in JavaScript, you
> > might have onClick to handle click events. Therefore, using "on" may be
> > misleading if the method is being used to check for the existence of a
> next
> > element.
>
> For an async iterator, you'd want a name that clearly conveys that the
> > method will check for the next item asynchronously and return a promise
> or
> > some form of future result. In JavaScript, which supports async
> iteration,
> > the standard method for this is next(), which when used with async
> > iterators, returns a promise that resolves to an object with properties
> > value and done.
>
> Here are a couple of better alternatives:
>
> hasNextAsync: This name clearly states that the function is an asynchronous
> > version of the typical hasNext method found in synchronous iterators.
> > nextExists: This name suggests the method checks for the existence of a
> > next item, without the potential confusion of event handler naming
> > conventions.
> >
>
> WDYT?
>
> Best,
> Jane
>
> On Tue, Mar 19, 2024 at 5:47 PM Zakelly Lan  wrote:
>
> > Hi everyone,
> >
> > Thanks for your valuable feedback!
> >
> > The discussions were vibrant and have led to significant enhancements to
> > this FLIP. With this progress, I'm looking to initiate the voting in 72
> > hours.
> >
> > Please let me know if you have any concerns, thanks!
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Mar 19, 2024 at 5:35 PM Zakelly Lan 
> wrote:
> >
> > > Hi Yue,
> > >
> > > Thanks for your comments!
> > >
> > > 1. Is it possible for all `FutureUtils` in Flink to reuse the same util
> > >> class?
> > >
> > > Actually, the `FutureUtils` here is a new util class that will share
> the
> > > same package path with the `StateFuture`. Or I'd be fine renaming it
> > > 'StateFutureUtils'.
> > >
> > > 2. It seems that there is no concept of retry, timeout, or delay in
> your
> > >> async state api design . Do we need to provide such capabilities like
> > >> `orTimeout` 、`completeDelayed`?
> > >>
> > > For ease of use, we do not provide such APIs allowing users to
> customize
> > > the behavior on timeout or retry. We may introduce a retry mechanism in
> > the
> > > framework enabled by configuration. And we will hide the 'complete' and
> > > related APIs of StateFuture from users, since the completion of these
> > > futures is totally managed by the execution framework.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > >
> > > On Tue, Mar 19, 2024 at 5:20 PM yue ma  wrote:
> > >
> > >> Hi Zakelly,
> > >>
> > >> Thanks for your proposal. The FLIP looks good  to me +1! I'd like to
> ask
> > >> some minor questions
> > >> I found that there is also a definition of class `FutureUtils` under
> > `org.
> > >> apache. flink. util. concurrent` which seems to offer more interfaces.
> > My
> > >> question is:
> > >> 1. Is it possible for all `FutureUtils` in Flink to reuse the same
> util
> > >> class?
> > >> 2. It seems that there is no concept of retry

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-19 Thread Zakelly Lan
Hi everyone,

Thanks for your valuable feedback!

The discussions were vibrant and have led to significant enhancements to
this FLIP. With this progress, I'm looking to initiate the voting in 72
hours.

Please let me know if you have any concerns, thanks!


Best,
Zakelly

On Tue, Mar 19, 2024 at 5:35 PM Zakelly Lan  wrote:

> Hi Yue,
>
> Thanks for your comments!
>
> 1. Is it possible for all `FutureUtils` in Flink to reuse the same util
>> class?
>
> Actually, the `FutureUtils` here is a new util class that will share the
> same package path with the `StateFuture`. Or I'd be fine renaming it
> 'StateFutureUtils'.
>
> 2. It seems that there is no concept of retry, timeout, or delay in your
>> async state api design . Do we need to provide such capabilities like
>> `orTimeout` 、`completeDelayed`?
>>
> For ease of use, we do not provide such APIs allowing users to customize
> the behavior on timeout or retry. We may introduce a retry mechanism in the
> framework enabled by configuration. And we will hide the 'complete' and
> related APIs of StateFuture from users, since the completion of these
> futures is totally managed by the execution framework.
>
>
> Best,
> Zakelly
>
>
>
> On Tue, Mar 19, 2024 at 5:20 PM yue ma  wrote:
>
>> Hi Zakelly,
>>
>> Thanks for your proposal. The FLIP looks good  to me +1! I'd like to ask
>> some minor questions
>> I found that there is also a definition of class `FutureUtils` under `org.
>> apache. flink. util. concurrent` which seems to offer more interfaces. My
>> question is:
>> 1. Is it possible for all `FutureUtils` in Flink to reuse the same util
>> class?
>> 2. It seems that there is no concept of retry, timeout, or delay in your
>> async state api design . Do we need to provide such capabilities like
>> `orTimeout` 、`completeDelayed`?
>>
>> Jing Ge  于2024年3月13日周三 20:00写道:
>>
>> > indeed! I missed that part. Thanks for the hint!
>> >
>> > Best regards,
>> > Jing
>> >
>> > On Wed, Mar 13, 2024 at 6:02 AM Zakelly Lan 
>> wrote:
>> >
>> > > Hi Jing,
>> > >
>> > > The deprecation and removal of original APIs is beyond the scope of
>> > > current FLIP, but I do add/highlight such information under
>> > "Compatibility,
>> > > Deprecation, and Migration Plan" section.
>> > >
>> > >
>> > > Best,
>> > > Zakelly
>> > >
>> > > On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou <
>> > flink.zhouyunf...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Zakelly,
>> > >>
>> > >> Thanks for your responses. I agree with it that we can keep the
>> design
>> > >> as it is for now and see if others have any better ideas for these
>> > >> questions.
>> > >>
>> > >> Best,
>> > >> Yunfeng
>> > >>
>> > >> On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan 
>> > >> wrote:
>> > >> >
>> > >> > Hi Xuannan,
>> > >> >
>> > >> > Thanks for your comments, I modified the FLIP accordingly.
>> > >> >
>> > >> > Hi Yunfeng,
>> > >> >
>> > >> > Thanks for sharing your opinions!
>> > >> >
>> > >> >> Could you provide some hint on use cases where users need to mix
>> sync
>> > >> >> and async state operations in spite of the performance regression?
>> > >> >> This information might help address our concerns on design. If the
>> > >> >> mixed usage is simply something not recommended, I would prefer to
>> > >> >> prohibit such usage from API.
>> > >> >
>> > >> > In fact, there is no scenario where users MUST use the sync APIs,
>> but
>> > >> it is much easier to use for those who are not familiar with
>> > asynchronous
>> > >> programming. If they want to migrate their job from Flink 1.x to 2.0
>> > >> leveraging some benefits from asynchronous APIs, they may try the
>> mixed
>> > >> usage. It is not user-friendly to directly throw exceptions at
>> runtime,
>> > I
>> > >> think our better approach is to warn users and recommend avoiding
>> this.
>> > I
>> > >> added an example in this FLIP.
>> > >> >
>> > >> > Well, I do not insi

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-19 Thread Zakelly Lan
Hi everyone,

Thanks for your valuable feedback!

Our discussions have been going on for a while and are nearing a
consensus. So I would like to start a vote after 72 hours.

Please let me know if you have any concerns, thanks!


Best,
Zakelly

On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan  wrote:

> Hi Yunfeng,
>
> Thanks for the suggestion!
>
> I will reorganize the FLIP-425 accordingly.
>
>
> Best,
> Zakelly
>
> On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou 
> wrote:
>
>> Hi Xintong and Zakelly,
>>
>> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
>> I agree with it that watermarks can use only out-of-order mode for
>> now, because there is still not a concrete example showing the
>> correctness risk about it. However, the strictly-ordered mode should
>> still be supported as the default option for non-record event types
>> other than watermark, at least for checkpoint barriers.
>>
>> I noticed that this information has already been documented in "For
>> other non-record events, such as RecordAttributes ...", but it's at
>> the bottom of the "Watermark" section, which might not be very
>> obvious. Thus it might be better to reorganize the FLIP to better
>> claim that the two order modes are designed for all non-record events,
>> and which mode this FLIP would choose for each type of event.
>>
>> Best,
>> Yunfeng
>>
>> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song 
>> wrote:
>> >
>> > Thanks for the quick response. Sounds good to me.
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan 
>> wrote:
>> >
>> > > Hi Xintong,
>> > >
>> > > Thanks for sharing your thoughts!
>> > >
>> > > 1. Regarding Record-ordered and State-ordered of processElement.
>> > > >
>> > > > I understand that while State-ordered likely provides better
>> performance,
>> > > > Record-ordered is sometimes required for correctness. The question
>> is how
>> > > > should a user choose between these two modes? My concern is that
>> such a
>> > > > decision may require users to have in-depth knowledge about the
>> Flink
>> > > > internals, and may lead to correctness issues if State-ordered is
>> chosen
>> > > > improperly.
>> > > >
>> > > > I'd suggest not to expose such a knob, at least in the first
>> version.
>> > > That
>> > > > means always use Record-ordered for custom operators / UDFs, and
>> keep
>> > > > State-ordered for internal usages (built-in operators) only.
>> > > >
>> > >
>> > > Indeed, users may not be able to choose the mode properly. I agree to
>> keep
>> > > such options for internal use.
>> > >
>> > >
>> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
>> > > >
>> > > > I'm not entirely sure about Strictly-ordered being the default, or
>> even
>> > > > being supported. From my understanding, a Watermark(T) only
>> suggests that
>> > > > all records with event time before T has arrived, and it has
>> nothing to
>> > > do
>> > > > with whether records with event time after T has arrived or not.
>> From
>> > > that
>> > > > perspective, preventing certain records from arriving before a
>> Watermark
>> > > is
>> > > > never supported. I also cannot come up with any use case where
>> > > > Strictly-ordered is necessary. This implies the same issue as 1):
>> how
>> > > does
>> > > > the user choose between the two modes?
>> > > >
>> > > > I'd suggest not expose the knob to users and only support
>> Out-of-order,
>> > > > until we see a concrete use case that Strictly-ordered is needed.
>> > > >
>> > >
>> > > The semantics of watermarks do not define the sequence between a
>> watermark
>> > > and subsequent records. For the most part, this is inconsequential,
>> except
>> > > it may affect some current users who have previously relied on the
>> implicit
>> > > assumption of an ordered execution. I'd be fine with initially
>> supporting
>> > > only out-of-order processing. We may consider exposing the
>> > >

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-19 Thread Zakelly Lan
Hi Yue,

Thanks for your comments!

1. Is it possible for all `FutureUtils` in Flink to reuse the same util
> class?

Actually, the `FutureUtils` here is a new util class that will share the
same package path with the `StateFuture`. Or I'd be fine renaming it
'StateFutureUtils'.

2. It seems that there is no concept of retry, timeout, or delay in your
> async state api design . Do we need to provide such capabilities like
> `orTimeout` 、`completeDelayed`?
>
For ease of use, we do not provide such APIs allowing users to customize
the behavior on timeout or retry. We may introduce a retry mechanism in the
framework enabled by configuration. And we will hide the 'complete' and
related APIs of StateFuture from users, since the completion of these
futures is totally managed by the execution framework.


Best,
Zakelly



On Tue, Mar 19, 2024 at 5:20 PM yue ma  wrote:

> Hi Zakelly,
>
> Thanks for your proposal. The FLIP looks good  to me +1! I'd like to ask
> some minor questions
> I found that there is also a definition of class `FutureUtils` under `org.
> apache. flink. util. concurrent` which seems to offer more interfaces. My
> question is:
> 1. Is it possible for all `FutureUtils` in Flink to reuse the same util
> class?
> 2. It seems that there is no concept of retry, timeout, or delay in your
> async state api design . Do we need to provide such capabilities like
> `orTimeout` 、`completeDelayed`?
>
> Jing Ge  于2024年3月13日周三 20:00写道:
>
> > indeed! I missed that part. Thanks for the hint!
> >
> > Best regards,
> > Jing
> >
> > On Wed, Mar 13, 2024 at 6:02 AM Zakelly Lan 
> wrote:
> >
> > > Hi Jing,
> > >
> > > The deprecation and removal of original APIs is beyond the scope of
> > > current FLIP, but I do add/highlight such information under
> > "Compatibility,
> > > Deprecation, and Migration Plan" section.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou <
> > flink.zhouyunf...@gmail.com>
> > > wrote:
> > >
> > >> Hi Zakelly,
> > >>
> > >> Thanks for your responses. I agree with it that we can keep the design
> > >> as it is for now and see if others have any better ideas for these
> > >> questions.
> > >>
> > >> Best,
> > >> Yunfeng
> > >>
> > >> On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan 
> > >> wrote:
> > >> >
> > >> > Hi Xuannan,
> > >> >
> > >> > Thanks for your comments, I modified the FLIP accordingly.
> > >> >
> > >> > Hi Yunfeng,
> > >> >
> > >> > Thanks for sharing your opinions!
> > >> >
> > >> >> Could you provide some hint on use cases where users need to mix
> sync
> > >> >> and async state operations in spite of the performance regression?
> > >> >> This information might help address our concerns on design. If the
> > >> >> mixed usage is simply something not recommended, I would prefer to
> > >> >> prohibit such usage from API.
> > >> >
> > >> > In fact, there is no scenario where users MUST use the sync APIs,
> but
> > >> it is much easier to use for those who are not familiar with
> > asynchronous
> > >> programming. If they want to migrate their job from Flink 1.x to 2.0
> > >> leveraging some benefits from asynchronous APIs, they may try the
> mixed
> > >> usage. It is not user-friendly to directly throw exceptions at
> runtime,
> > I
> > >> think our better approach is to warn users and recommend avoiding
> this.
> > I
> > >> added an example in this FLIP.
> > >> >
> > >> > Well, I do not insist on allowing mixed usage of APIs if others
> reach
> > >> an agreement that we won't support that . I think the most important
> is
> > to
> > >> keep the API easy to use and understand, thus I propose a unified
> state
> > >> declaration and explicit meaning in method name. WDYT?
> > >> >
> > >> >> Sorry I missed the new sink API. I do still think that it would be
> > >> >> better to make the package name more informative, and ".v2." does
> not
> > >> >> contain information for new Flink users who did not know the v1 of
> > >> >> state API. Unlike internal implementation and performance
> > >> >> optimizat

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-19 Thread Zakelly Lan
Hi Yunfeng,

Thanks for the suggestion!

I will reorganize the FLIP-425 accordingly.


Best,
Zakelly

On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou 
wrote:

> Hi Xintong and Zakelly,
>
> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> I agree with it that watermarks can use only out-of-order mode for
> now, because there is still not a concrete example showing the
> correctness risk about it. However, the strictly-ordered mode should
> still be supported as the default option for non-record event types
> other than watermark, at least for checkpoint barriers.
>
> I noticed that this information has already been documented in "For
> other non-record events, such as RecordAttributes ...", but it's at
> the bottom of the "Watermark" section, which might not be very
> obvious. Thus it might be better to reorganize the FLIP to better
> claim that the two order modes are designed for all non-record events,
> and which mode this FLIP would choose for each type of event.
>
> Best,
> Yunfeng
>
> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song 
> wrote:
> >
> > Thanks for the quick response. Sounds good to me.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan 
> wrote:
> >
> > > Hi Xintong,
> > >
> > > Thanks for sharing your thoughts!
> > >
> > > 1. Regarding Record-ordered and State-ordered of processElement.
> > > >
> > > > I understand that while State-ordered likely provides better
> performance,
> > > > Record-ordered is sometimes required for correctness. The question
> is how
> > > > should a user choose between these two modes? My concern is that
> such a
> > > > decision may require users to have in-depth knowledge about the Flink
> > > > internals, and may lead to correctness issues if State-ordered is
> chosen
> > > > improperly.
> > > >
> > > > I'd suggest not to expose such a knob, at least in the first version.
> > > That
> > > > means always use Record-ordered for custom operators / UDFs, and keep
> > > > State-ordered for internal usages (built-in operators) only.
> > > >
> > >
> > > Indeed, users may not be able to choose the mode properly. I agree to
> keep
> > > such options for internal use.
> > >
> > >
> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> > > >
> > > > I'm not entirely sure about Strictly-ordered being the default, or
> even
> > > > being supported. From my understanding, a Watermark(T) only suggests
> that
> > > > all records with event time before T has arrived, and it has nothing
> to
> > > do
> > > > with whether records with event time after T has arrived or not. From
> > > that
> > > > perspective, preventing certain records from arriving before a
> Watermark
> > > is
> > > > never supported. I also cannot come up with any use case where
> > > > Strictly-ordered is necessary. This implies the same issue as 1): how
> > > does
> > > > the user choose between the two modes?
> > > >
> > > > I'd suggest not expose the knob to users and only support
> Out-of-order,
> > > > until we see a concrete use case that Strictly-ordered is needed.
> > > >
> > >
> > > The semantics of watermarks do not define the sequence between a
> watermark
> > > and subsequent records. For the most part, this is inconsequential,
> except
> > > it may affect some current users who have previously relied on the
> implicit
> > > assumption of an ordered execution. I'd be fine with initially
> supporting
> > > only out-of-order processing. We may consider exposing the
> > > 'Strictly-ordered' mode once we encounter a concrete use case that
> > > necessitates it.
> > >
> > >
> > > My philosophies behind not exposing the two config options are:
> > > > - There are already too many options in Flink that barely know how
> to use
> > > > them. I think Flink should try as much as possible to decide its own
> > > > behavior, rather than throwing all the decisions to the users.
> > > > - It's much harder to take back knobs than to introduce them.
> Therefore,
> > > > options should be introduced only if concrete use cases are
> identified.
> > > >
> > >
> > > I agree to keep minimal configurable items especially for the M

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-18 Thread Zakelly Lan
e use cases are identified.
>
> WDYT?
>
> Best,
>
> Xintong
>
>
>
> On Fri, Mar 8, 2024 at 2:45 AM Jing Ge  wrote:
>
> > +1 for Gyula's suggestion. I just finished FLIP-423 which introduced the
> > intention of the big change and high level architecture. Great content
> btw!
> > The only public interface change for this FLIP is one new config to use
> > ForSt. It makes sense to have one dedicated discussion thread for each
> > concrete system design.
> >
> > @Zakelly The links in your mail do not work except the last one, because
> > the FLIP-xxx has been included into the url like
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> .
> >
> > NIT fix:
> >
> > FLIP-424:
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> >
> > FLIP-425:
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> >
> > FLIP-426:
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> >
> > FLIP-427:
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> >
> > FLIP-428:
> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> >
> > Best regards,
> > Jing
> >
> >
> >
> >
> > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan 
> wrote:
> >
> > > Hi everyone,
> > >
> > > Thank you all for a lively discussion here, and it is a good time to
> move
> > > forward to more detailed discussions. Thus we open several threads for
> > > sub-FLIPs:
> > >
> > > FLIP-424:
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > FLIP-425
> > > <
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> >:
> > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > FLIP-426
> > > <
> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> >:
> > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > FLIP-427
> > > <
> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> >:
> > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > FLIP-428
> > > <
> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> >:
> > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > >
> > > If you want to talk about the overall architecture, roadmap, milestones
> > or
> > > something related with multiple FLIPs, please post it here. Otherwise
> you
> > > can discuss some details in separate mails. Let's try to avoid repeated
> > > discussion in different threads. I will sync important messages here if
> > > there are any in the above threads.
> > >
> > > And reply to @Jeyhun: We will ensure the content between those FLIPs is
> > > consistent.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei 
> wrote:
> > >
> > > > I have been a bit busy these few weeks and sorry for responding late.
> > > >
> > > > The original thinking of keeping discussion within one thread is for
> > > easier
> > > > tracking and avoid for repeated discussion in different threads.
> > > >
> > > > For details, It might be good to start in different threads if
> needed.
> > > >
> > > > We will think of a way to better organize the discussion.
> > > >
> > > > Best
> > > > Yuan
> > > >
> > > >
> > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > + 1 for the suggestion.
> > > > > Maybe we can the discussion with the FLIPs with minimum
> dependencies
> > > > (from
> > > > > the other new/proposed FLIPs).
> > > > > Based on our discussion on a particular FLIP, the subsequent (or
> its
> > > > > dependent) FLIP(s) can be updated accordingly?
> > > > >
> > > > > Regards,
> > > > > Jeyhun
> > > > >
> > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra 
> > > wrote:
> > > > >
> > > > > > Hey all!
> > > > > >
> > > > > > This is a massive improvement / work. I just sta

Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Zakelly Lan
Congratulations!

Thanks Lincoln, Yun, Martijn and Jing for driving this release.
Thanks everyone involved.


Best,
Zakelly

On Mon, Mar 18, 2024 at 5:05 PM weijie guo 
wrote:

> Congratulations!
>
> Thanks release managers and all the contributors involved.
>
> Best regards,
>
> Weijie
>
>
> Leonard Xu  于2024年3月18日周一 16:45写道:
>
>> Congratulations, thanks release managers and all involved for the great
>> work!
>>
>>
>> Best,
>> Leonard
>>
>> > 2024年3月18日 下午4:32,Jingsong Li  写道:
>> >
>> > Congratulations!
>> >
>> > On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >>
>> >> Congratulations, thanks for the great work!
>> >>
>> >> Best,
>> >> Rui
>> >>
>> >> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
>> wrote:
>> >>>
>> >>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
>> series.
>> >>>
>> >>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> >>>
>> >>> The release is available for download at:
>> >>> https://flink.apache.org/downloads.html
>> >>>
>> >>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> >>>
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> >>>
>> >>> The full release notes are available in Jira:
>> >>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> >>>
>> >>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>> >>>
>> >>>
>> >>> Best,
>> >>> Yun, Jing, Martijn and Lincoln
>>
>>


Re: Unaligned checkpoint blocked by long Async operation

2024-03-17 Thread Zakelly Lan
I agree. Also create https://issues.apache.org/jira/browse/FLINK-34704 for
tracking and further discussion.


Best,
Zakelly

On Fri, Mar 15, 2024 at 2:59 PM Gyula Fóra  wrote:

> Posting this to dev as well...
>
> Thanks Zakelly,
> Sounds like a solution could be to add a new different version of yield
> that would actually yield to the checkpoint barrier too. That way operator
> implementations could decide whether any state modification may or may not
> have happened and can optionally allow checkpoint to be taken in the
> "middle of record  processing".
>
> Gyula
>
> On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:
>
>> Hi Gyula,
>>
>> Processing checkpoint halfway through `processElement` is problematic.
>> The current element will not be included in the input in-flight data, and
>> we cannot assume it has taken effect on the state by user code. So the best
>> way is to treat `processElement` as an 'atomic' operation. I guess that's
>> why the priority of the cp barrier is set low.
>> However, the AsyncWaitOperator is a special case where we know the
>> element blocked at `addToWorkQueue` has not started triggering the
>> userFunction. Thus I'd suggest putting the element in the queue when the cp
>> barrier comes, and taking a snapshot of the whole queue afterwards. The
>> problem will be solved. But this approach also involves some code
>> modifications on the mailbox executor.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>>
>>> Thank you for the detailed analysis Zakelly.
>>>
>>> I think we should consider whether yield should process checkpoint
>>> barriers because this puts quite a serious limitation on the unaligned
>>> checkpoints in these cases.
>>> Do you know what is the reason behind the current priority setting? Is
>>> there a problem with processing the barrier here?
>>>
>>> Gyula
>>>
>>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>>> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> Well I tried your example in local mini-cluster, and it seems the
>>>> source can take checkpoints but it will block in the following
>>>> AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until
>>>> the current `processElement` finishes its execution. In your example, the
>>>> element queue of `AsyncWaitOperator` will end up full and `processElement`
>>>> will be blocked at `addToWorkQueue`. Even though it will call
>>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>>> unprocessed since the priority of the barrier is -1, lower than the one
>>>> `yield()` should handle. I verified this using single-step debugging.
>>>>
>>>> And if one element could finish its async io, the cp barrier can be
>>>> processed afterwards. For example:
>>>> ```
>>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>>>> env.getConfig().setParallelism(1);
>>>> AsyncDataStream.orderedWait(
>>>> env.fromSequence(Long.MIN_VALUE,
>>>> Long.MAX_VALUE).shuffle(),
>>>> new AsyncFunction() {
>>>> boolean first = true;
>>>> @Override
>>>> public void asyncInvoke(Long aLong,
>>>> ResultFuture resultFuture) {
>>>> if (first) {
>>>>
>>>> Executors.newSingleThreadExecutor().execute(() -> {
>>>> try {
>>>> Thread.sleep(2); // process
>>>> after 20s, only for the first one.
>>>> } catch (Throwable e) {}
>>>> LOG.info("Complete one");
>>>>
>>>> resultFuture.complete(Collections.singleton(1L));
>>>> });
>>>> first = false;
>>>> }
>>>> }
>>>> },
>>>> 24,
>>>> TimeUnit.HOURS,
>>>> 1)
>>>> .print();
>>>> ```
>>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>>> print.
>

[jira] [Created] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-03-17 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34704:
---

 Summary: Process checkpoint barrier in AsyncWaitOperator when the 
element queue is full
 Key: FLINK-34704
 URL: https://issues.apache.org/jira/browse/FLINK-34704
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Zakelly Lan


As discussed in 
https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it is 
better to provide such a new `yield` that can process mail with low priority in 
the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34668) Report State handle of file merging directory to JM

2024-03-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34668:
---

 Summary: Report State handle of file merging directory to JM
 Key: FLINK-34668
 URL: https://issues.apache.org/jira/browse/FLINK-34668
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Yanfei Lei
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-12 Thread Zakelly Lan
Hi Jing,

The deprecation and removal of original APIs is beyond the scope of current
FLIP, but I do add/highlight such information under "Compatibility,
Deprecation, and Migration Plan" section.


Best,
Zakelly

On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou 
wrote:

> Hi Zakelly,
>
> Thanks for your responses. I agree with it that we can keep the design
> as it is for now and see if others have any better ideas for these
> questions.
>
> Best,
> Yunfeng
>
> On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan  wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for your comments, I modified the FLIP accordingly.
> >
> > Hi Yunfeng,
> >
> > Thanks for sharing your opinions!
> >
> >> Could you provide some hint on use cases where users need to mix sync
> >> and async state operations in spite of the performance regression?
> >> This information might help address our concerns on design. If the
> >> mixed usage is simply something not recommended, I would prefer to
> >> prohibit such usage from API.
> >
> > In fact, there is no scenario where users MUST use the sync APIs, but it
> is much easier to use for those who are not familiar with asynchronous
> programming. If they want to migrate their job from Flink 1.x to 2.0
> leveraging some benefits from asynchronous APIs, they may try the mixed
> usage. It is not user-friendly to directly throw exceptions at runtime, I
> think our better approach is to warn users and recommend avoiding this. I
> added an example in this FLIP.
> >
> > Well, I do not insist on allowing mixed usage of APIs if others reach an
> agreement that we won't support that . I think the most important is to
> keep the API easy to use and understand, thus I propose a unified state
> declaration and explicit meaning in method name. WDYT?
> >
> >> Sorry I missed the new sink API. I do still think that it would be
> >> better to make the package name more informative, and ".v2." does not
> >> contain information for new Flink users who did not know the v1 of
> >> state API. Unlike internal implementation and performance
> >> optimization, API will hardly be compromised for now and updated in
> >> future, so I still suggest we improve the package name now if
> >> possible. But given the existing practice of sink v2 and
> >> AbstractStreamOperatorV2, the current package name would be acceptable
> >> to me if other reviewers of this FLIP agrees on it.
> >
> > Actually, I don't like 'v2' either. So if there is another good name,
> I'd be happy to apply. This is a compromise to the current situation. Maybe
> we could refine this after the retirement of original state APIs.
> >
> >
> > Thanks & Best,
> > Zakelly
> >
> >
> > On Tue, Mar 12, 2024 at 1:42 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com> wrote:
> >>
> >> Hi Zakelly,
> >>
> >> Thanks for the quick response!
> >>
> >> > Actually splitting APIs into two sets ... warn them in runtime.
> >>
> >> Could you provide some hint on use cases where users need to mix sync
> >> and async state operations in spite of the performance regression?
> >> This information might help address our concerns on design. If the
> >> mixed usage is simply something not recommended, I would prefer to
> >> prohibit such usage from API.
> >>
> >> > In fact ... .sink2`.
> >>
> >> Sorry I missed the new sink API. I do still think that it would be
> >> better to make the package name more informative, and ".v2." does not
> >> contain information for new Flink users who did not know the v1 of
> >> state API. Unlike internal implementation and performance
> >> optimization, API will hardly be compromised for now and updated in
> >> future, so I still suggest we improve the package name now if
> >> possible. But given the existing practice of sink v2 and
> >> AbstractStreamOperatorV2, the current package name would be acceptable
> >> to me if other reviewers of this FLIP agrees on it.
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Mon, Mar 11, 2024 at 5:27 PM Zakelly Lan 
> wrote:
> >> >
> >> > Hi Yunfeng,
> >> >
> >> > Thanks for your comments!
> >> >
> >> > +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> >> > > having both get() and asyncGet() in the same State class. As a
> >> > > supplement to its benefits, this design could help avoi

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-12 Thread Zakelly Lan
Hi Weijie,

Thanks for your reply!

Overall I'd be fine with the builder pattern, but it is a little bit long
when carrying explicit 'build()' and declaring the builder. Keeping the
StateDeclaration immutable is OK, but it is a little bit inconvenient for
overriding the undefined options by job configuration at runtime. I'd
suggest providing some methods responsible for rebuilding a new
StateDeclaration with new configurable options, just like the
ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not
going to insist on it.


Best,
Zakelly

On Tue, Mar 12, 2024 at 2:07 PM weijie guo 
wrote:

> Hi Zakelly,
>
> > But still, from a user's point of view,  state can be characterized along
> two relatively independent dimensions, how states redistribute and the data
> structure. Thus I still suggest a chained-like configuration API that
> configures one aspect on each call.
>
>
> I think the chained-like style is a good suggestion. But I'm not going to
> introduce any mutable-like API to StateDeclaration (even though we can
> achieve immutability by returning a new object). For this reason, I decided
> to use the builder pattern, which also has the benefit of chaining calls
> and allows us to support further configurations such as setTTL in the
> future. For ease of use, we'll also provide some shortcuts to avoid having
> to go through a long build chain each time. Of course, I have updated the
> the FLIP about this part.
>
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月12日周二 14:00写道:
>
> > Hi Hangxiang,
> >
> > > So these operators only define all states they may use which could be
> > explained by the caller, right ?
> >
> > Yes, you're right.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月12日周二 13:59写道:
> >
> >> Hi Max,
> >>
> >> > In this thread it looks like the plan is to remove the old state
> >> declaration API. I think we should consider keeping the old APIs to
> >> avoid breaking too many jobs.
> >>
> >> We're not plan to remove any old apis, which means that changes made in
> >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new
> state
> >> declaration API, and users who want to migrate to DataStream V2 will
> need
> >> to rewrite their jobs anyway.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >> Hangxiang Yu  于2024年3月12日周二 10:26写道:
> >>
> >>> Hi, Weijie.
> >>> Thanks for your answer!
> >>>
> >>> > No, Introducing and declaring new state
> >>> > at runtime is something we want to explicitly disallow.
> >>>
> >>> I just thinked about how some operators define their useState() when
> >>> their
> >>> real used states may be changed at runtime, e.g. different state types
> >>> for
> >>> different state sizes.
> >>> So these operators only define all states they may use which could be
> >>> explained by the caller, right ?
> >>>
> >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
> >>> wrote:
> >>>
> >>> > The FLIP mentions: "The contents described in this FLIP are all new
> >>> > APIs and do not involve compatibility issues."
> >>> >
> >>> > In this thread it looks like the plan is to remove the old state
> >>> > declaration API. I think we should consider keeping the old APIs to
> >>> > avoid breaking too many jobs. The new APIs will still be beneficial
> >>> > for new jobs, e.g. for SQL jobs.
> >>> >
> >>> > -Max
> >>> >
> >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
> >>> wrote:
> >>> > >
> >>> > > Hi Weijie,
> >>> > >
> >>> > > Thanks for your answer! Well I get your point. Since partitions are
> >>> > > first-class citizens, and redistribution means how states migrate
> >>> when
> >>> > > partitions change, I'd be fine with deemphasizing the concept of
> >>> > > keyed/operator state if we highlight the definition of partition in
> >>> the
> >>> > > document. Keeping `RedistributionMode` under `StateDeclaration` is
> >>> also
> >>> > > fine with me, as I guess it is only for internal usage.
> >>> > > But still, from a user's point of view,  sta

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-12 Thread Zakelly Lan
Hi Xuannan,

Thanks for your comments, I modified the FLIP accordingly.

Hi Yunfeng,

Thanks for sharing your opinions!

Could you provide some hint on use cases where users need to mix sync
> and async state operations in spite of the performance regression?
> This information might help address our concerns on design. If the
> mixed usage is simply something not recommended, I would prefer to
> prohibit such usage from API.

In fact, there is no scenario where users MUST use the sync APIs, but it is
much easier to use for those who are not familiar with asynchronous
programming. If they want to migrate their job from Flink 1.x to 2.0
leveraging some benefits from asynchronous APIs, they may try the mixed
usage. It is not user-friendly to directly throw exceptions at runtime, I
think our better approach is to warn users and recommend avoiding this. I
added an example in this FLIP.

Well, I do not insist on allowing mixed usage of APIs if others reach an
agreement that we won't support that . I think the most important is to
keep the API easy to use and understand, thus I propose a unified state
declaration and explicit meaning in method name. WDYT?

Sorry I missed the new sink API. I do still think that it would be
> better to make the package name more informative, and ".v2." does not
> contain information for new Flink users who did not know the v1 of
> state API. Unlike internal implementation and performance
> optimization, API will hardly be compromised for now and updated in
> future, so I still suggest we improve the package name now if
> possible. But given the existing practice of sink v2 and
> AbstractStreamOperatorV2, the current package name would be acceptable
> to me if other reviewers of this FLIP agrees on it.

Actually, I don't like 'v2' either. So if there is another good name, I'd
be happy to apply. This is a compromise to the current situation. Maybe we
could refine this after the retirement of original state APIs.


Thanks & Best,
Zakelly


On Tue, Mar 12, 2024 at 1:42 PM Yunfeng Zhou 
wrote:

> Hi Zakelly,
>
> Thanks for the quick response!
>
> > Actually splitting APIs into two sets ... warn them in runtime.
>
> Could you provide some hint on use cases where users need to mix sync
> and async state operations in spite of the performance regression?
> This information might help address our concerns on design. If the
> mixed usage is simply something not recommended, I would prefer to
> prohibit such usage from API.
>
> > In fact ... .sink2`.
>
> Sorry I missed the new sink API. I do still think that it would be
> better to make the package name more informative, and ".v2." does not
> contain information for new Flink users who did not know the v1 of
> state API. Unlike internal implementation and performance
> optimization, API will hardly be compromised for now and updated in
> future, so I still suggest we improve the package name now if
> possible. But given the existing practice of sink v2 and
> AbstractStreamOperatorV2, the current package name would be acceptable
> to me if other reviewers of this FLIP agrees on it.
>
> Best,
> Yunfeng
>
> On Mon, Mar 11, 2024 at 5:27 PM Zakelly Lan  wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for your comments!
> >
> > +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> > > having both get() and asyncGet() in the same State class. As a
> > > supplement to its benefits, this design could help avoid having users
> > > to use sync and async API in a mixed way (unless they create both a
> > > State and an AsyncState from the same state descriptor), which is
> > > supposed to bring suboptimal performance according to the FLIP's
> > > description.
> >
> >
> > Actually splitting APIs into two sets of classes also brings some
> > difficulties. In this case, users must explicitly define their usage
> before
> > actually doing state access. It is a little strange that the user can
> > define a sync and an async version of State with the same name, while
> they
> > cannot allocate two async States with the same name.
> > Another reason for distinguishing API by their method name instead of
> class
> > name is that users typically use the State instances to access state but
> > forget their type/class. For example:
> > ```
> > SyncState a = getState(xxx);
> > AsyncState b = getAsyncState(xxx);
> > //...
> > a.update(1);
> > b.update(1);
> > ```
> > Users are likely to think there is no difference between the
> `a.update(1)`
> > and `b.update(1)`, since they may forget the type for `a` and `b`. Thus I
> > proposed to distinguish the behavior in method names.
&

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-11 Thread Zakelly Lan
Hi Xuannan,

Thanks for your comments!

1. The name `emptyFuture` seems a little unintuitive, and it is hard
> to understand in what use case the `emptyFuture` should be used. If I
> understand correctly, it is similar to the
> FutureUtils#completedVoidFuture. How about naming it
> completedVoidStateFuture?

It will be used in callback chaining cases where some branch within one
callback does nothing. I'm in favor of short phrases to express the
functionalities. Thus I suggest `completedVoidFuture` or `voidFuture`, WDTY?

2. IIUC, the `FutureUtils` is intended to be used by the user. If
> that's the case, `FutureUtils` should be annotated as a public
> interface, such as `PublicEvolving`.
>
Yes I missed that, thanks for the reminder.

3. The state classes, such as `ValueState`, `ListState`, etc., are
> essential for users, and we should add JavaDocs to those classes and
> their methods.

Yes, this will be added in implementation, I just omitted them for easy
reading.


Thanks & Best,
Zakelly

On Mon, Mar 11, 2024 at 5:25 PM Zakelly Lan  wrote:

> Hi Yunfeng,
>
> Thanks for your comments!
>
> +1 for JingGe's suggestion to introduce an AsyncState API, instead of
>> having both get() and asyncGet() in the same State class. As a
>> supplement to its benefits, this design could help avoid having users
>> to use sync and async API in a mixed way (unless they create both a
>> State and an AsyncState from the same state descriptor), which is
>> supposed to bring suboptimal performance according to the FLIP's
>> description.
>
>
> Actually splitting APIs into two sets of classes also brings some
> difficulties. In this case, users must explicitly define their usage before
> actually doing state access. It is a little strange that the user can
> define a sync and an async version of State with the same name, while they
> cannot allocate two async States with the same name.
> Another reason for distinguishing API by their method name instead of
> class name is that users typically use the State instances to access state
> but forget their type/class. For example:
> ```
> SyncState a = getState(xxx);
> AsyncState b = getAsyncState(xxx);
> //...
> a.update(1);
> b.update(1);
> ```
> Users are likely to think there is no difference between the `a.update(1)`
> and `b.update(1)`, since they may forget the type for `a` and `b`. Thus I
> proposed to distinguish the behavior in method names.
> As for the suboptimal performance with mixed usage of sync and async, my
> proposal is to warn them in runtime.
>
> I noticed that the FLIP proposes to place the newly introduced API in
>> the package "org.apache.flink.api.common.state.v2", which seems a
>> little strange to me as there has not been such a naming pattern
>> ".v2." for packages in Flink.
>
>
> In fact, there are some similar existing patterns, like
> `org.apache.flink.streaming.api.functions.sink.v2` and
> `org.apache.flink.streaming.api.connector.sink2`.
>
>  I would suggest discussing this topic
>> with the main authors of Datastream V2, like Weijie Guo, so that the
>> newly introduced APIs from both sides comply with a unified naming
>> style.
>
> I'm afraid we are facing a different situation with the Datastream V2. For
> total reconstruction of Datastream API, it is big enough to build a
> seperate module and keep good package names. While for state APIs, we
> should stay in the flink-core(-api) module alongside with other
> apis, currently I tend to compromise at the expense of naming style.
>
>
> Looking forward to hearing from you again!
>
> Thanks & Best,
> Zakelly
>
> On Mon, Mar 11, 2024 at 4:20 PM Yunfeng Zhou 
> wrote:
>
>> Hi Zakelly,
>>
>> Thanks for the proposal! The structure of the Async API generally
>> looks good to me. Some comments on the details of the design are as
>> follows.
>>
>> +1 for JingGe's suggestion to introduce an AsyncState API, instead of
>> having both get() and asyncGet() in the same State class. As a
>> supplement to its benefits, this design could help avoid having users
>> to use sync and async API in a mixed way (unless they create both a
>> State and an AsyncState from the same state descriptor), which is
>> supposed to bring suboptimal performance according to the FLIP's
>> description.
>>
>> I noticed that the FLIP proposes to place the newly introduced API in
>> the package "org.apache.flink.api.common.state.v2", which seems a
>> little strange to me as there has not been such a naming pattern
>> ".v2." for packages in Flink. I would suggest discussing this topic
>> with the main authors of Datastream V2, like Wei

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-11 Thread Zakelly Lan
Hi Yunfeng,

Thanks for your comments!

+1 for JingGe's suggestion to introduce an AsyncState API, instead of
> having both get() and asyncGet() in the same State class. As a
> supplement to its benefits, this design could help avoid having users
> to use sync and async API in a mixed way (unless they create both a
> State and an AsyncState from the same state descriptor), which is
> supposed to bring suboptimal performance according to the FLIP's
> description.


Actually splitting APIs into two sets of classes also brings some
difficulties. In this case, users must explicitly define their usage before
actually doing state access. It is a little strange that the user can
define a sync and an async version of State with the same name, while they
cannot allocate two async States with the same name.
Another reason for distinguishing API by their method name instead of class
name is that users typically use the State instances to access state but
forget their type/class. For example:
```
SyncState a = getState(xxx);
AsyncState b = getAsyncState(xxx);
//...
a.update(1);
b.update(1);
```
Users are likely to think there is no difference between the `a.update(1)`
and `b.update(1)`, since they may forget the type for `a` and `b`. Thus I
proposed to distinguish the behavior in method names.
As for the suboptimal performance with mixed usage of sync and async, my
proposal is to warn them in runtime.

I noticed that the FLIP proposes to place the newly introduced API in
> the package "org.apache.flink.api.common.state.v2", which seems a
> little strange to me as there has not been such a naming pattern
> ".v2." for packages in Flink.


In fact, there are some similar existing patterns, like
`org.apache.flink.streaming.api.functions.sink.v2` and
`org.apache.flink.streaming.api.connector.sink2`.

 I would suggest discussing this topic
> with the main authors of Datastream V2, like Weijie Guo, so that the
> newly introduced APIs from both sides comply with a unified naming
> style.

I'm afraid we are facing a different situation with the Datastream V2. For
total reconstruction of Datastream API, it is big enough to build a
seperate module and keep good package names. While for state APIs, we
should stay in the flink-core(-api) module alongside with other
apis, currently I tend to compromise at the expense of naming style.


Looking forward to hearing from you again!

Thanks & Best,
Zakelly

On Mon, Mar 11, 2024 at 4:20 PM Yunfeng Zhou 
wrote:

> Hi Zakelly,
>
> Thanks for the proposal! The structure of the Async API generally
> looks good to me. Some comments on the details of the design are as
> follows.
>
> +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> having both get() and asyncGet() in the same State class. As a
> supplement to its benefits, this design could help avoid having users
> to use sync and async API in a mixed way (unless they create both a
> State and an AsyncState from the same state descriptor), which is
> supposed to bring suboptimal performance according to the FLIP's
> description.
>
> I noticed that the FLIP proposes to place the newly introduced API in
> the package "org.apache.flink.api.common.state.v2", which seems a
> little strange to me as there has not been such a naming pattern
> ".v2." for packages in Flink. I would suggest discussing this topic
> with the main authors of Datastream V2, like Weijie Guo, so that the
> newly introduced APIs from both sides comply with a unified naming
> style. If we reach an agreement on the first comment, my personal idea
> is that we can place the AsyncState interfaces to
> "org.apache.flink.api.common.state.async", and the existing state APIs
> to "org.apache.flink.api.common.state" or
> "org.apache.flink.api.common.state.sync".
>
> Best regards,
> Yunfeng Zhou
>
> On Thu, Mar 7, 2024 at 4:48 PM Zakelly Lan  wrote:
> >
> > Hi devs,
> >
> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> > State Storage and Management[1], which is a joint work of Yuan Mei,
> Zakelly
> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
> >
> >  - FLIP-424: Asynchronous State APIs [2]
> >
> > This FLIP introduces new APIs for asynchronous state access.
> >
> > Please make sure you have read the FLIP-423[1] to know the whole story,
> and
> > we'll discuss the details of FLIP-424[2] under this mail. For the
> > discussion of overall architecture or topics related with multiple
> > sub-FLIPs, please post in the previous mail[3].
> >
> > Looking forward to hearing from you!
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> >
> >
> > Best,
> > Zakelly
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-11 Thread Zakelly Lan
Hi Jing,

Thanks for your comments!

Sorry for not making this clear. Actually these APIs are in some newly
introduced classes, which are located in a different package name with the
original ones. I suggest we name it "State API V2" and the package name
will be 'org.apache.flink.api.common.state.v2'. They work closely with the
Datastream V2 and are annotated with @Experimental in first few versions,
and will be promoted to @PublicEvolving alongside the DataStream V2. I
agree that the name of interfaces should add 'async', and we will also
support synchronous APIs in these new API classes. This approach allows us
to:

   1. Have enough flexibility for rapid development with @Experimental
   annotation.
   2. Provide both sync and async style APIs for greater ease of use.
   3. Release ourselves from legacy constraints that prevent altering
   interface signatures, and we can do something like reorganizing the
   exceptions as FLIP-368[1] proposed.

State API V2 will become the exclusive API set available to users when
working with DataStream API V2. We may discuss the deprecation of original
ones in future.

WDYT?


Best,
Zakelly


On Sun, Mar 10, 2024 at 8:34 PM Jing Ge  wrote:

> Hi Zakelly,
>
> Thanks for your proposal. The FLIP looks in good shape. +1 for it! I'd like
> to ask some questions to understand your thoughts more precisely.
>
> 1. StateFuture is a new interface. At first glance, it should
> be @Experimental. But according to our API Arch rule[1], it should be at
> least @PublicEvolving, if it will be used by any existing PublicEvloving
> classes. You might want to add this info to your FLIP, if we want to go
> with this option.
>
> 2. The return types of methods in State and related sub-interfaces are
> StateFuture. Since the old State interfaces already have those methods
> and Java does not allow method overload with the same method but different
> return types. Do you want to change the old methods or use new interfaces?
> My understanding is that, according to the description in the
> "Compatibility, Deprecation, and Migration Plan'' section in the FLIP, new
> interfaces will be defined alongside the old interfaces. I guess the
> long-term intention of this FLIP is not to deprecate the synchronous State
> API. Both State APIs will be supported for different scenarios. In this
> case, does it make sense to:
>
> 2.1 annotated all new interfaces with @Experimental to have the
> flexibility for further modifications?
> 2.2 use different names e.g. AsyncState etc. to avoid potential
> human mistakes while coding(e.g. import wrong package by mistake etc.) and
> ease the job development with state?
>
> Best regards,
> Jing
>
>
> [1]
>
> https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-architecture-tests/flink-architecture-tests-production/src/main/java/org/apache/flink/architecture/rules/ApiAnnotationRules.java#L99
>
> On Thu, Mar 7, 2024 at 9:49 AM Zakelly Lan  wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> > State Storage and Management[1], which is a joint work of Yuan Mei,
> Zakelly
> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
> >
> >  - FLIP-424: Asynchronous State APIs [2]
> >
> > This FLIP introduces new APIs for asynchronous state access.
> >
> > Please make sure you have read the FLIP-423[1] to know the whole story,
> and
> > we'll discuss the details of FLIP-424[2] under this mail. For the
> > discussion of overall architecture or topics related with multiple
> > sub-FLIPs, please post in the previous mail[3].
> >
> > Looking forward to hearing from you!
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> >
> >
> > Best,
> > Zakelly
> >
>


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread Zakelly Lan
ize
> > > the job better. As a result, this logic must be fully available at
> > compile
> > > time (when the JobGraph is generated), so it can't rely on computations
> > > that are executed after deploy to TM.
> > > >>
> > > >>
> > > >> >
> > > >> Currently state access is pretty dynamic in Flink and I would assume
> > > many jobs create states on the fly based on some required logic. Are we
> > > planning to address these use-cases?
> > > >>
> > > >>
> > > >> It depends on what type of context we need. If the type and number
> of
> > > states depend on runtime context, that's something we want to avoid. If
> > it
> > > only depended on information available at compile time, I think we
> could
> > > support
> > > >> it.
> > > >>
> > > >>
> > > >> >
> > > >> Are we planning to support deleting/dropping states that are not
> > > required anymore?
> > > >>
> > > >>
> > > >>
> > > >> We really don't want the user to be able to dynamically
> declare/delete
> > > a state at runtime, but if you just want to clear/clean the value of
> > state,
> > > the new API works the same as the old API.
> > > >>
> > > >>
> > > >> > I think if a state is not declared or otherwise cannot be
> accessed,
> > an
> > > >> exceptions must be thrown. We cannot confuse empty value with
> > something
> > > >> inaccessible.
> > > >>
> > > >>
> > > >> After thinking about it a bit more, I think you have a point!
> > > >> It's important to make a clear distinction between an empty state
> and
> > > illegal access, especially since flink currently discourage setting a
> > > non-null default value for the state.
> > > >> I will modify the proposal as you suggested then :)
> > > >>
> > > >>
> > > >> > The RedistributionMode enum sounds a bit strange to me, as it
> > doesn't
> > > >> actually specify a mode of redistribution. It feels more like a
> flag.
> > > Can
> > > >> we simply have an Optional instead?
> > > >>
> > > >>
> > > >> We actually define three types RedistributionMode instead of two
> > because
> > > >> we don't want to think of IDENTICAL as a redistribution strategy,
> it's
> > > just
> > > >> an invariant: the State of that type is always the same across
> > > partitions.
> > > >> If it only has None and REDISTRIBUTABLE, I think your proposal is
> > > >> feasible then. But we don't want to confuse these three
> > semantics/modes.
> > > >>
> > > >>
> > > >> > BroadcastStates are currently very limited by only Map-like
> states,
> > > and
> > > >> the new interface also enforces that. Can we remove this limitation?
> > If
> > > >> not, should broadcastState declaration extend mapstate declaration?
> > > >>
> > > >>
> > > >>
> > > >> Personally, I don't want to make this restriction. This is also why
> > the
> > > method in StateManager to get BroadcastState has the parameter of
> > > BroadcastStateDeclaration instead of MapStateDeclaration. In the
> future,
> > if
> > > the state backend supports other types of broadcast state, we can add a
> > > corresponding method to the States utility class to get the
> > > BroadcastSateDeclaration.
> > > >>
> > > >>
> > > >>
> > > >> Best regards,
> > > >>
> > > >> Weijie
> > > >>
> > > >>
> > > >> Hangxiang Yu  于2024年3月7日周四 11:55写道:
> > > >>
> > > >>> Hi, Weijie.
> > > >>> Thanks for your proposal.
> > > >>> I'd like to start the discussion with some questions:
> > > >>> 1. We have also discussed in FLIP-359/FLINK-32658 about limiting
> the
> > > user
> > > >>> operation to avoid creating state when processElement. Could
> current
> > > >>> interfaces also help this?
> > > >>>
> > > >>> 2. Could you provide more examples about how useStates() works ?
> > Since
> > > >>>

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-07 Thread Zakelly Lan
Hi everyone,

Thank you all for a lively discussion here, and it is a good time to move
forward to more detailed discussions. Thus we open several threads for
sub-FLIPs:

FLIP-424: https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
FLIP-425: https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
FLIP-426: https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
FLIP-427: https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
FLIP-428: https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b

If you want to talk about the overall architecture, roadmap, milestones or
something related with multiple FLIPs, please post it here. Otherwise you
can discuss some details in separate mails. Let's try to avoid repeated
discussion in different threads. I will sync important messages here if
there are any in the above threads.

And reply to @Jeyhun: We will ensure the content between those FLIPs is
consistent.


Best,
Zakelly

On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei  wrote:

> I have been a bit busy these few weeks and sorry for responding late.
>
> The original thinking of keeping discussion within one thread is for easier
> tracking and avoid for repeated discussion in different threads.
>
> For details, It might be good to start in different threads if needed.
>
> We will think of a way to better organize the discussion.
>
> Best
> Yuan
>
>
> On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov 
> wrote:
>
> > Hi,
> >
> > + 1 for the suggestion.
> > Maybe we can the discussion with the FLIPs with minimum dependencies
> (from
> > the other new/proposed FLIPs).
> > Based on our discussion on a particular FLIP, the subsequent (or its
> > dependent) FLIP(s) can be updated accordingly?
> >
> > Regards,
> > Jeyhun
> >
> > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra  wrote:
> >
> > > Hey all!
> > >
> > > This is a massive improvement / work. I just started going through the
> > > Flips and have a more or less meta comment.
> > >
> > > While it's good to keep the overall architecture discussion here, I
> think
> > > we should still have separate discussions for each FLIP where we can
> > > discuss interface details etc. With so much content if we start adding
> > > minor comments here that will lead to nowhere but those discussions are
> > > still important and we should have them in separate threads (one for
> each
> > > FLIP)
> > >
> > > What do you think?
> > > Gyula
> > >
> > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei  wrote:
> > >
> > > > Hi team,
> > > >
> > > > Thanks for your discussion. Regarding FLIP-425, we have supplemented
> > > > several updates to answer high-frequency questions:
> > > >
> > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > "Synchronous execution with asynchronous APIs"[1], which reveals that
> > > > the framework overhead (including reference counting, future-related
> > > > code and so on) consumes about 9% of the keyed operator CPU time.
> > > > 2. We added a set of comparative experiments for watermark
> processing,
> > > > the performance of Out-Of-Order mode is 70% better than
> > > > strictly-ordered mode under ~140MB state size. Instructions on how to
> > > > run this test have also been added[2].
> > > > 3. Regarding the order of StreamRecord, whether it has state access
> or
> > > > not. We supplemented a new *Strict order of 'processElement'*[3].
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > [3]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > >
> > > >
> > > > Best regards,
> > > > Yanfei
> > > >
> > > > Yunfeng Zhou  于2024年3月5日周二 09:25写道:
> > > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > > 5. I'm not very sure ... revisiting this later since it is not
> > >

[DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-07 Thread Zakelly Lan
Hi devs,

I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

 - FLIP-424: Asynchronous State APIs [2]

This FLIP introduces new APIs for asynchronous state access.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-424[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ
[2] https://cwiki.apache.org/confluence/x/SYp3EQ
[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0


Best,
Zakelly


Re: [VOTE] FLIP-420: Add API annotations for RocksDB StateBackend user-facing classes

2024-03-06 Thread Zakelly Lan
+1 non-binding

Thanks for proposing this.

Best,
Zakelly

On Thu, Mar 7, 2024 at 10:13 AM Yanfei Lei  wrote:

> +1(binding) for this vote.
>
> Hangxiang Yu  于2024年3月7日周四 09:54写道:
> >
> > +1 (binding)
> >
> > On Thu, Mar 7, 2024 at 9:34 AM Yun Tang  wrote:
> >
> > > > +1 for this FLIP.
> > > Sorry for not being clear in my previous reply, it's a binding vote.
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Jeyhun Karimov 
> > > Sent: Thursday, March 7, 2024 4:40
> > > To: dev@flink.apache.org 
> > > Subject: Re: [VOTE] FLIP-420: Add API annotations for RocksDB
> StateBackend
> > > user-facing classes
> > >
> > > Hi Jinzhong,
> > >
> > > Thanks for the FLIP.
> > >
> > > +1 (non-binding)
> > >
> > > Regards,
> > > Jeyhun
> > >
> > > On Wed, Mar 6, 2024 at 5:09 PM Yun Tang  wrote:
> > >
> > > > +1 for this FLIP.
> > > >
> > > >
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Jinzhong Li 
> > > > Sent: Wednesday, March 6, 2024 20:29
> > > > To: dev@flink.apache.org 
> > > > Subject: [VOTE] FLIP-420: Add API annotations for RocksDB
> StateBackend
> > > > user-facing classes
> > > >
> > > > Hi All,
> > > >
> > > > I'd like to start a vote on the FLIP-420: Add API annotations for
> RocksDB
> > > > StateBackend user-facing classes[1].
> > > > The discussion thread is here [2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> objection
> > > or
> > > > not enough votes.
> > > >
> > > >
> > > > [1]https://cwiki.apache.org/confluence/x/JQs4EQ
> > > > [2]https://lists.apache.org/thread/4t71lz2j2ft8hf90ylvtomynhr2qthoo
> > > >
> > > >
> > > > Best,
> > > > Jinzhong Li
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
>
>
>
> --
> Best,
> Yanfei
>


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-06 Thread Zakelly Lan
Hi Weijie,

Thanks for proposing this!

Unifying and optimizing state definitions is a very good thing. I like the
idea of 'definition goes before using', so overall +1 for this proposal.

However, I think the current definition is somewhat unclear. From a user's
point of view, I believe that state can be characterized along two
relatively independent axes: the scenario (keyed, non-keyed, or broadcast)
and the data structure (single value, list, map). I recommend that we fully
decouple these aspects, rather than linking the nature of the definition to
specific assumptions, such as equating broadcast states with maps, or
considering list states could be non-keyed.
Furthermore, the concept of 'Redistribution' may impose a cognitive burden
on general users. My advice would be to conceal RedistributionMode/Strategy
from the standard user interface, particularly within the helper class
'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
are basically used by the framework. My preferred syntax would be:
```
StateDeclaration a = State.declare(name).keyed().listState(type);
StateDeclaration b = State.declare(name).broadcast().mapState(typeK, typeV);
StateDeclaration c = State.declare(name).keyed().aggregatingState(type,
function);
```
WDYT?


Best,
Zakelly

On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra  wrote:

> Hi Weijie!
>
> Thank you for the proposal.
>
> I have some initial questions to start the discussion:
>
> 1. What is the semantics of the usesStates method? When is it called? Can
> the used state change dynamically at runtime? Can the logic depend on
> something computed in open(..) for example?
>
> Currently state access is pretty dynamic in Flink and I would assume many
> jobs create states on the fly based on some required logic. Are we planning
> to address these use-cases?
>
> Are we planning to support deleting/dropping states that are not required
> anymore?
>
> 2. Get state now returns an optional, but you mention that:
> " If you want to get a state that is not declared or has no access,
> Option#empty is returned."
>
> I think if a state is not declared or otherwise cannot be accessed, an
> exceptions must be thrown. We cannot confuse empty value with something
> inaccessible.
>
> 3. The RedistributionMode enum sounds a bit strange to me, as it doesn't
> actually specify a mode of redistribution. It feels more like a flag. Can
> we simply have an Optional instead?
>
> 4. BroadcastStates are currently very limited by only Map-like states, and
> the new interface also enforces that.
> Can we remove this limitation? If not, should broadcastState declaration
> extend mapstate declaration?
>
> Cheers,
> Gyula
>
> Cheers
> Gyuka
>
> On Wed, Mar 6, 2024 at 11:18 AM weijie guo 
> wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion about FLIP-433: State Access on
> > DataStream API V2
> > [1]. This is the third sub-FLIP of DataStream API V2.
> >
> >
> > After FLIP-410 [2], we can already write a simple stateless job using the
> > DataStream V2 API.  But as we all know, stateful computing is Flink's
> trump
> > card. In this FLIP, we will discuss how to declare and access state on
> > DataStream API V2 and we manage to avoid some of the shortcomings of V1
> in
> > this regard.
> >
> > You can find more details in this FLIP. Its relationship with other
> > sub-FLIPs can be found in the umbrella FLIP
> > [3]. Looking forward to hearing from you, thanks!
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> >
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> >
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> >
>


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-04 Thread Zakelly Lan
DYT?
>
>
> Besides, inspired by Jeyhun's comments, it comes to me that
>
> 8. Should this FLIP introduce metrics that measure the time a Flink
> job is back-pressured by State IOs? Under the current design, this
> metric could measure the time when the blocking buffer is full and
> yield() cannot get callbacks to process, which means the operator is
> fully waiting for state responses.
>
> Best regards,
> Yunfeng
>
> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan  wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for your detailed comments!
> >
> >> 1. Why do we need a close() method on StateIterator? This method seems
> >> unused in the usage example codes.
> >
> >
> > The `close()` is introduced to release internal resources, but it does
> not seem to require the user to call it. I removed this.
> >
> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> >> entries are allowed". It might be better to further explain what will
> >> happen if a null value is passed, ignoring the value in the returned
> >> Collection or throwing exceptions. Given that
> >> FutureUtils.emptyFuture() can be returned in the example code, I
> >> suppose the former one might be correct.
> >
> >
> > The statement "No null entries are allowed" refers to the parameters, it
> means some arrayList like [null, StateFuture1, StateFuture2] passed in are
> not allowed, and an Exception will be thrown.
> >
> >> 1. According to Fig 2 of this FLIP, ... . This situation should be
> >> avoided and the order of same-key records should be strictly
> >> preserved.
> >
> >
> > I will discuss this with some expert SQL developers. And if it is valid
> and common, I suggest a strict order preservation mode for StreamRecord
> processing. WDYT?
> >
> >> 2. The FLIP says that StateRequests submitted by Callbacks will not
> >> invoke further yield() methods. Given that yield() is used when there
> >> is "too much" in-flight data, does it mean StateRequests submitted by
> >> Callbacks will never be "too much"? What if the total number of
> >> StateRequests exceed the capacity of Flink operator's memory space?
> >
> >
> > The amount of parallel StateRequests for one StreamRecord cannot be
> determined since the code is written by users. So the in-flight requests
> may be "too much", and may cause OOM. Users should re-configure their job,
> controlling the amount of on-going StreamRecord. And I suggest two ways to
> avoid this:
> >
> > Adaptively adjust the count of on-going StreamRecord according to
> historical StateRequests amount.
> > Also control the max StateRequests that can be executed in parallel for
> each StreamRecord, and if it exceeds, put the new StateRequest in the
> blocking buffer waiting for execution (instead of invoking yield()).
> >
> > WDYT?
> >
> >
> >> 3.1 I'm concerned that the out-of-order execution mode, along with the
> >> epoch mechanism, would bring more complexity to the execution model
> >> than the performance improvement it promises. Could we add some
> >> benchmark results proving the benefit of this mode?
> >
> >
> > Agreed, will do.
> >
> >> 3.2 The FLIP might need to add a public API section describing how
> >> users or developers can switch between these two execution modes.
> >
> >
> > Good point. We will add a Public API section.
> >
> >> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> >> there are also more other events that might appear in the stream of
> >> data records. It might be better to generalize the execution mode
> >> mechanism to handle all possible events.
> >
> >
> > Yes, I missed this point. Thanks for the reminder.
> >
> >> 4. It might be better to treat callback-handling as a
> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> >> repeatedly creating Mail objects.
> >
> >
> >  I thought the intermediated wrapper for callback can not be omitted,
> since there will be some context switch before each execution. The
> MailboxDefaultAction in most cases is processInput right? While the
> callback should be executed with higher priority. I'd suggest not changing
> the basic logic of Mailbox and the default action since it is very critical
> for performance. But yes, we will try our best to avoid creating
> intermediated objects.
> >
> >> 5. Could this FLIP provide the current default values for thi

Re: [DISCUSS] Move CheckpointingMode to flink-core

2024-03-04 Thread Zakelly Lan
Hi all,

Thanks for sharing your opinions!

I'd like to draw a conclusion. I had an offline discussion with @Hangxiang
Yu and @Junrui Lee today, and we agreed that option 2 could make the
package cleaner, thus it is a better way to go.
We will follow option 2, and the deprecation will be discussed later in
2.0. If there is any objection, please let me know, thanks.


Best,
Zakelly

On Wed, Feb 28, 2024 at 7:49 PM Xintong Song  wrote:

> Personally, I'd be in favor of option 2. And based on the fact that
> migrating from the deprecated CheckpointingMode to the new one takes barely
> any effort (simply re-import the class), I'd be fine with removing the
> deprecated class in 2.0.
>
> But I'd also be fine with the other options.
>
> Either way, agree that we should not block 1.19 on this.
>
> Best,
>
> Xintong
>
>
>
> On Wed, Feb 28, 2024 at 6:16 PM Junrui Lee  wrote:
>
> > Hi Zakelly,
> >
> > +1 for option 1. I prefer to minimize unnecessary additional development
> > and discussions due to internal code relocations and to avoid imposing
> > migration costs on users.
> >
> > Best regards,
> > Junrui
> >
> > Zakelly Lan  于2024年2月28日周三 14:46写道:
> >
> > > Hi Lincoln,
> > >
> > > Given that we have finished the testing for 1.19, I agree it is better
> > not
> > > merge this into 1.19. Thanks for RMs' attention!
> > >
> > > Hi Chesney and Junrui,
> > >
> > > Thanks for your advice. My original intention is to move the class as
> > well
> > > as change the package to make it clean. But it involves much more
> effort.
> > > Here are several options we have:
> > >
> > >1. Move CheckpointingMode to flink-core and keep the same package.
> No
> > >more deprecation and API changes. But it will leave a
> > >'org.apache.flink.streaming.api' package in flink-core.
> > >2. Introduce new CheckpointingMode in package
> > >'org.apache.flink.core.execution' and deprecate the old one.
> Deprecate
> > > the
> > >corresponding getter/setter of 'CheckpointConfig' and introduce new
> > ones
> > >with a similar but different name (e.g. set/getCheckpointMode). We
> > will
> > >discuss the removal of those deprecation later in 2.x.
> > >3. Based on 1, move CheckpointingMode to package
> > >'org.apache.flink.core.execution' in 2.0. This is a breaking change
> > that
> > >needs more discussion.
> > >
> > > Both ways work. I'm slightly inclined to option 1, or option 3 if we
> all
> > > agree, since the new getter/setter may also bring in confusions thus we
> > > cannot make the API purely clean. WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Wed, Feb 28, 2024 at 10:14 AM Junrui Lee 
> wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > I agree with Chesnay's response. I would suggest that during the
> > process
> > > of
> > > > moving CheckpointingMode from the flink-streaming-java module to the
> > > > flink-core module, we should keep the package name unchanged. This
> > > approach
> > > > would be completely transparent to users. In fact, this practice
> should
> > > be
> > > > applicable to many of our desired moves from flink-streaming-java to
> > > > higher-level modules, such as flink-runtime and flink-core.
> > > >
> > > > Best,
> > > > Junrui
> > > >
> > > > Chesnay Schepler  于2024年2月28日周三 05:18写道:
> > > >
> > > > > Moving classes (== keep the same package) to a module higher up in
> > the
> > > > > dependency tree should not be a breaking change and can imo be done
> > > > > anytime without any risk to users.
> > > > >
> > > > > On 27/02/2024 17:01, Lincoln Lee wrote:
> > > > > > Hi Zakelly,
> > > > > >
> > > > > > Thanks for letting us 1.19 RMs know about this!
> > > > > >
> > > > > > This change has been discussed during today's release sync
> meeting,
> > > we
> > > > > > suggest not merge it into 1.19.
> > > > > > We can continue discussing the removal in 2.x separately.
> > > > > >
> > > > > > Best,
> > > > > > Lincoln Lee
> > > > > >
> > > > > >
> > > > > > Hangxiang Yu  于2024年2月2

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-03 Thread Zakelly Lan
Hi Jeyhun,

Thanks for your reply!

- Backpressure Handling.


I agree that Flink's original back pressure handling is simple and robust,
and in the current proposal we try our best to make the minimal change to
Mailbox for callback execution. When jumping out of the detailed execution
logic of state requests, you may find there is a single entry controlling
the size of in-flight StreamRecord (the AEC) and invoking yield(), which I
believe is relatively predictable. Asynchronous execution is never easy to
achieve, and I think we could prove the robustness in our UTs/ITs and
benchmarks before we release it.

- Watermark/Timer Handling
>

Yes, we need every benchmark for the newly introduced framework before the
release. But since the change is huge and we may not have time to
implement each mode and parameters combination, I suggest we first decide
whether to go based on some necessary PoC benchmarks. @Yunfeng Zhou also
suggests the benchmark for out-of-order execution of epoch manager, we'll
wait for that.

- DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
> However, different cloud providers have different storage consistency
> models.
> How do we want to deal with them?


Yes, current proposals treat DFS as a black box. The main change is to move
the local runtime files to DFS, and those files are read/written by local
clients in TM (won't read across TMs) and they can be lost if the job
stopped. Also the Flink's file system layer conceals many underlying
details, so I guess the file consistency of different DFS is not a big
thing in our implementation. Of course, there may be some optimization when
dealing with different DFS, we may achieve this later.


Thanks again & Best,
Zakelly


On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan  wrote:

> Hi Yunfeng,
>
> Thanks for your detailed comments!
>
> 1. Why do we need a close() method on StateIterator? This method seems
>> unused in the usage example codes.
>
>
> The `close()` is introduced to release internal resources, but it does not
> seem to require the user to call it. I removed this.
>
> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
>> entries are allowed". It might be better to further explain what will
>> happen if a null value is passed, ignoring the value in the returned
>> Collection or throwing exceptions. Given that
>> FutureUtils.emptyFuture() can be returned in the example code, I
>> suppose the former one might be correct.
>
>
> The statement "No null entries are allowed" refers to the parameters, it
> means some arrayList like [null, StateFuture1, StateFuture2] passed in are
> not allowed, and an Exception will be thrown.
>
> 1. According to Fig 2 of this FLIP, ... . This situation should be
>> avoided and the order of same-key records should be strictly
>> preserved.
>
>
> I will discuss this with some expert SQL developers. And if it is valid
> and common, I suggest a strict order preservation mode for StreamRecord
> processing. WDYT?
>
> 2. The FLIP says that StateRequests submitted by Callbacks will not
>> invoke further yield() methods. Given that yield() is used when there
>> is "too much" in-flight data, does it mean StateRequests submitted by
>> Callbacks will never be "too much"? What if the total number of
>> StateRequests exceed the capacity of Flink operator's memory space?
>
>
> The amount of parallel StateRequests for one StreamRecord cannot be
> determined since the code is written by users. So the in-flight requests
> may be "too much", and may cause OOM. Users should re-configure their job,
> controlling the amount of on-going StreamRecord. And I suggest two ways to
> avoid this:
>
>1. Adaptively adjust the count of on-going StreamRecord according to
>historical StateRequests amount.
>2. Also control the max StateRequests that can be executed in parallel
>for each StreamRecord, and if it exceeds, put the new StateRequest in the
>blocking buffer waiting for execution (instead of invoking yield()).
>
> WDYT?
>
>
> 3.1 I'm concerned that the out-of-order execution mode, along with the
>> epoch mechanism, would bring more complexity to the execution model
>> than the performance improvement it promises. Could we add some
>> benchmark results proving the benefit of this mode?
>
>
> Agreed, will do.
>
> 3.2 The FLIP might need to add a public API section describing how
>> users or developers can switch between these two execution modes.
>
>
> Good point. We will add a Public API section.
>
> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
>> there are also more other events that might appear in the stream of
>> data 

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-03 Thread Zakelly Lan
also relies on loopback (yield in this case), it is
> not clear how well the new backpressure handling proposed in FLIP-425 is
> robust and handle fluctuating workloads.
>
> - Watermark/Timer Handling: Similar arguments apply for watermark and timer
> handling. IMHO, we need more benchmarks showing the overhead
> of epoch management with different parameters (e.g., window size, watermark
> strategy, etc)
>
> - DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
> However, different cloud providers have different storage consistency
> models.
> How do we want to deal with them?
>
>  Regards,
> Jeyhun
>
>
>
>
> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan  wrote:
>
> > Thanks Piotr for sharing your thoughts!
> >
> > I guess it depends how we would like to treat the local disks. I've
> always
> > > thought about them that almost always eventually all state from the DFS
> > > should end up cached in the local disks.
> >
> >
> > OK I got it. In our proposal we treat local disk as an optional cache, so
> > the basic design will handle the case with state residing in DFS only. It
> > is a more 'cloud-native' approach that does not rely on any local storage
> > assumptions, which allow users to dynamically adjust the capacity or I/O
> > bound of remote storage to gain performance or save the cost, even
> without
> > a job restart.
> >
> > In
> > > the currently proposed more fine grained solution, you make a single
> > > request to DFS per each state access.
> > >
> >
> > Ah that's not accurate. Actually we buffer the state requests and process
> > them in batch, multiple requests will correspond to one DFS access (One
> > block access for multiple keys performed by RocksDB).
> >
> > In that benchmark you mentioned, are you requesting the state
> > > asynchronously from local disks into memory? If the benefit comes from
> > > parallel I/O, then I would expect the benefit to disappear/shrink when
> > > running multiple subtasks on the same machine, as they would be making
> > > their own parallel requests, right? Also enabling checkpointing would
> > > further cut into the available I/O budget.
> >
> >
> > That's an interesting topic. Our proposal is specifically aimed at the
> > scenario where the machine I/O is not fully loaded but the I/O latency
> has
> > indeed become a bottleneck for each subtask. While the distributed file
> > system is a prime example of a scenario characterized by abundant and
> > easily scalable I/O bandwidth coupled with higher I/O latency. You may
> > expect to increase the parallelism of a job to enhance the performance as
> > well, but that also brings in more waste of CPU's and memory for building
> > up more subtasks. This is one drawback for the computation-storage
> tightly
> > coupled nodes. While in our proposal, the parallel I/O with all the
> > callbacks still running in one task, pre-allocated computational
> resources
> > are better utilized. It is a much more lightweight way to perform
> parallel
> > I/O.
> >
> > Just with what granularity those async requests should be made.
> > > Making state access asynchronous is definitely the right way to go!
> >
> >
> > I think the current proposal is based on such core ideas:
> >
> >- A pure cloud-native disaggregated state.
> >- Fully utilize the given resources and try not to waste them
> (including
> >I/O).
> >- The ability to scale isolated resources (I/O or CPU or memory)
> >independently.
> >
> > We think a fine-grained granularity is more inline with those ideas,
> > especially without local disk assumptions and without any waste of I/O by
> > prefetching. Please note that it is not a replacement of the original
> local
> > state with synchronous execution. Instead this is a solution embracing
> the
> > cloud-native era, providing much more scalability and resource efficiency
> > when handling a *huge state*.
> >
> > What also worries me a lot in this fine grained model is the effect on
> the
> > > checkpointing times.
> >
> >
> > Your concerns are very reasonable. Faster checkpointing is always a core
> > advantage of disaggregated state, but only for the async phase. There
> will
> > be some complexity introduced by in-flight requests, but I'd suggest a
> > checkpoint containing those in-flight state requests as part of the
> state,
> > to accelerate the sync phase by skipping the buffer draining. This makes
> > the buffer size ha

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-01 Thread Zakelly Lan
llaFLIP)-PoCResults
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC


Best,
Zakelly


On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou 
wrote:

> Hi,
>
> Thanks for proposing this design! I just read FLIP-424 and FLIP-425
> and have some questions about the proposed changes.
>
> For Async API (FLIP-424)
>
> 1. Why do we need a close() method on StateIterator? This method seems
> unused in the usage example codes.
>
> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> entries are allowed". It might be better to further explain what will
> happen if a null value is passed, ignoring the value in the returned
> Collection or throwing exceptions. Given that
> FutureUtils.emptyFuture() can be returned in the example code, I
> suppose the former one might be correct.
>
>
> For Async Execution (FLIP-425)
>
> 1. According to Fig 2 of this FLIP, if a recordB has its key collide
> with an ongoing recordA, its processElement() method can still be
> triggered immediately, and then it might be moved to the blocking
> buffer in AEC if it involves state operations. This means that
> recordB's output will precede recordA's output in downstream
> operators, if recordA involves state operations while recordB does
> not. This will harm the correctness of Flink jobs in some use cases.
> For example, in dim table join cases, recordA could be a delete
> operation that involves state access, while recordB could be an insert
> operation that needs to visit external storage without state access.
> If recordB's output precedes recordA's, then an entry that is supposed
> to finally exist with recordB's value in the sink table might actually
> be deleted according to recordA's command. This situation should be
> avoided and the order of same-key records should be strictly
> preserved.
>
> 2. The FLIP says that StateRequests submitted by Callbacks will not
> invoke further yield() methods. Given that yield() is used when there
> is "too much" in-flight data, does it mean StateRequests submitted by
> Callbacks will never be "too much"? What if the total number of
> StateRequests exceed the capacity of Flink operator's memory space?
>
> 3. In the "Watermark" section, this FLIP provided an out-of-order
> execution mode apart from the default strictly-ordered mode, which can
> optimize performance by allowing more concurrent executions.
>
> 3.1 I'm concerned that the out-of-order execution mode, along with the
> epoch mechanism, would bring more complexity to the execution model
> than the performance improvement it promises. Could we add some
> benchmark results proving the benefit of this mode?
>
> 3.2 The FLIP might need to add a public API section describing how
> users or developers can switch between these two execution modes.
>
> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> there are also more other events that might appear in the stream of
> data records. It might be better to generalize the execution mode
> mechanism to handle all possible events.
>
> 4. It might be better to treat callback-handling as a
> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> repeatedly creating Mail objects.
>
> 5. Could this FLIP provide the current default values for things like
> active buffer size thresholds and timeouts? These could help with
> memory consumption and latency analysis.
>
> 6. Why do we need to record the hashcode of a record in its
> RecordContext? It seems not used.
>
> 7. In "timers can be stored on the JVM heap or RocksDB", the link
> points to a document in flink-1.15. It might be better to verify the
> referenced content is still valid in the latest Flink and update the
> link accordingly. Same for other references if any.
>
> Best,
> Yunfeng Zhou
>
> On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei  wrote:
> >
> > Hi Devs,
> >
> > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu,
> > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> introducing
> > Disaggregated State Storage and Management in Flink 2.0.
> >
> > The past decade has witnessed a dramatic shift in Flink's deployment
> mode,
> > workload patterns, and hardware improvements. We've moved from the
> > map-reduce era where workers are computation-storage tightly coupled
> nodes
> > to a cloud-native world where containerized deployments on Kubernetes
> > become standard. To enable Flink's Cloud-Native future, we introduce
> > Disaggregated State Storage and Management that uses DFS as primary
> stor

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-02-29 Thread Zakelly Lan
Hi Piotr,

Thanks for your thoughtful feedback!

In fact, we had a lot of internal debates on this topic :). The proposal
you mentioned is one of several possible alternatives we've considered. It
works well for simple use cases of ValueState or ListState, where users
typically read state by partitioned key and do some update for each
incoming record. However, it may not be suitable for scenarios where:

   - A state is read by condition.
   - MapState with the user key cannot be determined in advance.

The cases above are common in implementation of SQL operators. Given that
remote I/O is tens of times slower than the local I/O[1], a miss of that
pre-fetched cache will greatly affect the TPS and bring unnecessary
overhead. Thus we finally choose to focus on our current plan with the
actual state request as the entry of the asynchronous execution instead of
before the `processElement`. And moreover we could further optimize the
performance by grouping state access[2] and executing them *in parallel*.
This approach not only supports all state access patterns but also ensures
that the performance of disaggregated state remains competitive when
compared to an exclusively local state setup[3].

As for the potential downside you mentioned, according to our PoC tests[3],
it is still beneficial to load state from local disk asynchronously (See
line 4 of that table with 100% state in local cache). Optimization mainly
comes from parallel I/O. We believe that in most cases using file-based
state backend, the current plan can ensure performance improvement across
all state access patterns.

However, IIUC, your proposal is valuable in that it is compatible with the
original state APIs, and it can co-exist with the current plan. We do
consider providing such a pre-fetch cache under the original state APIs and
enhancing the performance transparently in future milestones.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-HurdlesfortheDisaggregatedStateModel
[2] https://cwiki.apache.org/confluence/x/TYp3EQ
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-PoCTest


Thanks again & Best,
Zakelly

On Thu, Feb 29, 2024 at 10:22 PM Piotr Nowojski 
wrote:

> Hi!
>
> Thanks for this proposal. It looks like it will be a great improvement!
> I've only started reading the FLIP's, but I already have some questions
> about the FLIP-425, the async execution.
>
> What's the motivation behind splitting execution of a single element into
> multiple independent steps in individual futures? Have you considered
> something like this (?) :
>
> 1. Check what is the key of the incoming record.
> 2. Is the state for that key missing?
> a) No! - just execute `processElement`/firing timer
> b) Yes :( Are we already fetching a state for that key?
> i) No - asynchronously fetch from the DFS to cache (local disk od
> mem) state for that key
> ii) Yes - enqueue this element/timer after the element that has
> started async fetch and is already waiting for the fetch to complete
> 3. Once the async fetch completes for a given key, run `processElement` or
> `onEventTime` for all of the buffered elements for that key.
>
> That should both eliminate overheads, simplify the API for the users and
> potentially further improve performance/reduce latencies from processing
> all elements for the already pre-fetched key.
>
> If we consider the state already available (so no need to asynchronously
> fetch it) if it's either on local disks or memory, then I don't see a
> downside of this compared to your current proposal. If you would like to
> asynchronously fetch state from local disks into memory cache, then the
> naive version of this approach would have a downside of potentially
> unnecessarily reading into RAM a state field that the current
> `processElement` call wouldn't need. But that could also be
> solved/mitigated in a couple of ways. And I think considering the state as
> "available" for sync access if it's in local disks (not RAM) is probably
> good enough (comparable to RocksDB).
>
> Best,
> Piotrek
>
> czw., 29 lut 2024 o 07:17 Yuan Mei  napisał(a):
>
> > Hi Devs,
> >
> > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu,
> > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> introducing
> > Disaggregated State Storage and Management in Flink 2.0.
> >
> > The past decade has witnessed a dramatic shift in Flink's deployment
> mode,
> > workload patterns, and hardware improvements. We've moved from the
> > map-reduce era where workers are computation-storage tightly coupled
> nodes
> > to a cloud-native world where c

  1   2   3   >