Re: [DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-10 Thread Gyula Fóra
Hey!

I may have missed this in the FLIP but did not find it spelled out
explicitly.

Can the user use the entire record processing context in their async
execution?
In other words can they access all functionality from the callback? For
example can they simply reference the keyed context and register a new
timer, get the timestamp etc?

If yes, does this mean that the context itself becomes "immutable" or the
context is switched in the background?

One example use-case would be that you get something from the state and
based on the value you register a timer.

Thanks,
Gyula

On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei  wrote:

> Hi Jing,
>
> Thanks for your thoughtful feedback!
>
> > does it mean that there will be three mails for Read, Update, and Output
> ?
>
> With this example, there are two mails. The Read is processed by
> `mailboxDefaultAction`[1], and the Update and Output are encapsulated
> as mail.
>
> > does it make sense to encapsulate one mail instead of 3 mails with more
> overhead?
>
> How many mails are encapsulated depends on how the user writes the
> code. The statements in a `then()` will be wrapped into a mail.
> StateFuture is a restricted version of CompletableFuture, their basic
> semantics are consistent.
>
> > Would you like to add more description for cases when exceptions
> happened? E.g. when reading or/and updating State throws IOExceptions.
>
> We describe this in the "Error handling"[2] section. This FLIP also
> adopts the design from FLIP-368, ensuring that all state interfaces
> throw unchecked exceptions and, consequently, do not declare any
> exceptions in their signatures. In cases where an exception occurs
> while accessing the state, the job should fail.
>
> > Is it correct to understand that AEC is stateless?
>
> Great perspective, yes, it can be understood that way.
> AEC is a task-level component. When the job fails or is restarted, the
> runtime status in AEC will be reset.
> In fact, we have considered taking a snapshot of the status in AEC and
> storing it in a checkpoint like "unaligned checkpoint", but since
> callback cannot be serialized, this idea is not feasible for the time
> being.
>
> > would you like to add Pseudo-code for the inFilghtReocordNum decrement
> to help us understand the logic better?
>
> This part of the code is a bit scattered, we will try to abstract a
> pseudo-code. You can first refer to the RecordContext-related code [3]
> in the PoC to understand it.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> [3]
> https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
>
>
> Best,
> Yanfei
>
> Jing Ge  于2024年3月10日周日 23:47写道:
> >
> > Hi Yanfei,
> >
> > Thanks for your proposal! The FLIP contains a lot of great new ideas. I'd
> > like to ask some questions to make sure we are on the same page.
> >
> > > For the asynchronous interface, Record A should run with Read, Update
> and
> > Output, while Record B should stay at the Blocking buffer.
> >
> > 1. With this example, does it mean that there will be three mails for
> Read,
> > Update, and Output ?
> > 2. If yes, since the Read, Update, and Output have to be executed before
> > Record B, does it make sense to encapsulate one mail instead of 3 mails
> > with more overhead? There must be some thoughts behind the design. Look
> > forward to it.
> >
> > > The challenge arises in determining when all the processing logic
> > associated with Record A is fully executed. To address this, we have
> > adopted a reference counting mechanism that tracks ongoing operations
> > (either processing input or executing a callback) related to a single
> > record.
> >
> > The design reminds me of the JVM reference counting for GC. Would you
> like
> > to add more description for cases when exceptions happened? E.g. when
> > reading or/and updating State throws IOExceptions.
> >
> > > In more detail, AEC uses a inFilghtReocordNum  variable to trace the
> > current number of records in progress. Every time the AEC receives a new
> > record, the inFilghtReocordNum  increases by 1; when all processing and
> > callback for this record have completed, the inFilghtReocordNum
> decreases
> > by 1. When processing one checkpoint mail, the current task thread will
> > give up the time slice through the yield() method of the mailbox
> executor,
> > so that the ongoing state request’s callback and the blocking state
> > requests will be drained first until inFlightRecordNum reduces to 0.
> >
> > 1. Speaking of draining, is it correct to understand that AEC is
> stateless?
> > E.g. AEC could be easily scaled out if it became a bottleneck.
> > 2. There are Pseudo-code for the 

[jira] [Created] (FLINK-34638) Support default value of table column.

2024-03-10 Thread LvYanquan (Jira)
LvYanquan created FLINK-34638:
-

 Summary: Support default value of table column.
 Key: FLINK-34638
 URL: https://issues.apache.org/jira/browse/FLINK-34638
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: LvYanquan
 Fix For: cdc-3.1.0


Support default value of table column of literal constant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: 退订

2024-03-10 Thread Jiabao Sun
发送一封不包含任何内容或主题的邮件到 listname-subscr...@flink.apache.org(替换 listname 为 dev,
user, user-zh 等等)来取消订阅。

如:dev-unsubscr...@flink.apache.org 取消订阅来自 dev@flink.apache.org 邮件列表的邮件。

邮件列表的订阅管理,可以参考[1]。

Best,
Jiabao

[1] https://flink.apache.org/zh/what-is-flink/community/

ss zz  于2024年3月11日周一 13:20写道:

> 退订
>


退订

2024-03-10 Thread ss zz
退订


退订

2024-03-10 Thread ss zz
退订


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-10 Thread Hangxiang Yu
Hi, Jeyhun.

Thanks for the reply.

Is this argument true for all workloads? Or does this argument also hold
for workloads with many small files, which is quite a common case [1] ?

Yes, I think so. The overhead should still be considered negligible,
particularly in comparison to remote I/O, and other benefits of this
proposal may be more significant than this one.

Additionally, there is JNI overhead when Flink calls RocksDB methods
currently. The frequency of these calls could surpass that of actual file
system interface calls, given that not all state requests require accessing
the file system.

BTW, the issue with small files can also impact the performance of db with
the local file system at runtime, so we usually resolve this firstly in the
production environment.

the engine spawns huge amount of scan range requests to the
file system to retrieve different parts of a file.

Indeed, frequent requests to the remote file system can significantly
affect performance. To address this, other FLIPs have introduced various
strategies:

1. Local disk cache to minimize remote requests as described in FLIP-423
which we will introduce in FLIP-429 as you mentioned. With effective cache
utilization, the performance will not be inferior to the local strategy
when cache hits.

2. Grouping remote access to decrease the number of remote I/O requests, as
proposed in "FLIP-426: Grouping Remote State Access."

3. Parallel I/O to maximize network bandwidth usage, outlined in "FLIP-425:
Asynchronous Execution Model."

The PoC implements a simple file cache and asynchronous execution which
improves the performance a lot. You could also refer to the PoC results in
FLIP-423.

On Mon, Mar 11, 2024 at 3:11 AM Jeyhun Karimov  wrote:

> Hi Hangxiang,
>
> Thanks for the proposal. +1 for it.
> I have a few comments.
>
> Proposal 2 has additional JNI overhead, but the overhead is relatively
> > negligible when weighed against the latency of remote I/O.
>
> - Is this argument true for all workloads? Or does this argument also hold
> for workloads with many small files, which is quite a common case [1] ?
>
> - Also, in many workloads the engine does not need the whole file either
> because of the query forces it or
> file type supports efficient filtering (e.g. ORC, parquet, arrow files), or
> simply one file is "divided" among multiple workers.
> In these cases, the engine spawns huge amount of scan range requests to the
> file system to retrieve different parts of a file.
> How the proposed solution would work with these workloads?
>
> - The similar question related to the above applies also for caching ( I
> know caching is subject of FLIP-429, asking here becasue of the related
> section in this FLIP).
>
> Regards,
> Jeyhun
>
> [1] https://blog.min.io/challenge-big-data-small-files/
>
>
>
> On Thu, Mar 7, 2024 at 10:09 AM Hangxiang Yu  wrote:
>
> > Hi devs,
> >
> >
> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> > State Storage and Management[1], which is a joint work of Yuan Mei,
> Zakelly
> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
> >
> > - FLIP-427: Disaggregated State Store
> >
> > This FLIP introduces the initial version of the ForSt disaggregated state
> > store.
> >
> > Please make sure you have read the FLIP-423[1] to know the whole story,
> and
> > we'll discuss the details of FLIP-427[2] under this mail. For the
> > discussion of overall architecture or topics related with multiple
> > sub-FLIPs, please post in the previous mail[3].
> >
> > Looking forward to hearing from you!
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> >
> > [2] https://cwiki.apache.org/confluence/x/T4p3EQ
> >
> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> >
> >
> > Best,
> >
> > Hangxiang.
> >
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-10 Thread Yanfei Lei
Hi Jing,

Thanks for your thoughtful feedback!

> does it mean that there will be three mails for Read, Update, and Output ?

With this example, there are two mails. The Read is processed by
`mailboxDefaultAction`[1], and the Update and Output are encapsulated
as mail.

> does it make sense to encapsulate one mail instead of 3 mails with more 
> overhead?

How many mails are encapsulated depends on how the user writes the
code. The statements in a `then()` will be wrapped into a mail.
StateFuture is a restricted version of CompletableFuture, their basic
semantics are consistent.

> Would you like to add more description for cases when exceptions happened? 
> E.g. when reading or/and updating State throws IOExceptions.

We describe this in the "Error handling"[2] section. This FLIP also
adopts the design from FLIP-368, ensuring that all state interfaces
throw unchecked exceptions and, consequently, do not declare any
exceptions in their signatures. In cases where an exception occurs
while accessing the state, the job should fail.

> Is it correct to understand that AEC is stateless?

Great perspective, yes, it can be understood that way.
AEC is a task-level component. When the job fails or is restarted, the
runtime status in AEC will be reset.
In fact, we have considered taking a snapshot of the status in AEC and
storing it in a checkpoint like "unaligned checkpoint", but since
callback cannot be serialized, this idea is not feasible for the time
being.

> would you like to add Pseudo-code for the inFilghtReocordNum decrement to 
> help us understand the logic better?

This part of the code is a bit scattered, we will try to abstract a
pseudo-code. You can first refer to the RecordContext-related code [3]
in the PoC to understand it.

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
[3] 
https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77


Best,
Yanfei

Jing Ge  于2024年3月10日周日 23:47写道:
>
> Hi Yanfei,
>
> Thanks for your proposal! The FLIP contains a lot of great new ideas. I'd
> like to ask some questions to make sure we are on the same page.
>
> > For the asynchronous interface, Record A should run with Read, Update and
> Output, while Record B should stay at the Blocking buffer.
>
> 1. With this example, does it mean that there will be three mails for Read,
> Update, and Output ?
> 2. If yes, since the Read, Update, and Output have to be executed before
> Record B, does it make sense to encapsulate one mail instead of 3 mails
> with more overhead? There must be some thoughts behind the design. Look
> forward to it.
>
> > The challenge arises in determining when all the processing logic
> associated with Record A is fully executed. To address this, we have
> adopted a reference counting mechanism that tracks ongoing operations
> (either processing input or executing a callback) related to a single
> record.
>
> The design reminds me of the JVM reference counting for GC. Would you like
> to add more description for cases when exceptions happened? E.g. when
> reading or/and updating State throws IOExceptions.
>
> > In more detail, AEC uses a inFilghtReocordNum  variable to trace the
> current number of records in progress. Every time the AEC receives a new
> record, the inFilghtReocordNum  increases by 1; when all processing and
> callback for this record have completed, the inFilghtReocordNum  decreases
> by 1. When processing one checkpoint mail, the current task thread will
> give up the time slice through the yield() method of the mailbox executor,
> so that the ongoing state request’s callback and the blocking state
> requests will be drained first until inFlightRecordNum reduces to 0.
>
> 1. Speaking of draining, is it correct to understand that AEC is stateless?
> E.g. AEC could be easily scaled out if it became a bottleneck.
> 2. There are Pseudo-code for the inFilghtReocordNum increment, would you
> like to add Pseudo-code for the inFilghtReocordNum decrement to help us
> understand the logic better?
>
> The FLIP shows overall a great design! +1 for it! Looking forward to your
> thoughts, thanks!
>
> Best regards,
> Jing
>
> On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei  wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion on FLIP-425: Asynchronous Execution
> > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage
> > and Management[2].
> >
> > FLIP-425 introduces a non-blocking execution model leveraging the
> > asynchronous APIs introduced in FLIP-424[3].
> > For the whole story please read the FLIP-423[2], and this thread is
> > aimed to discuss the details of "FLIP-425: Asynchronous Execution
> > Model".
> >
> > Regarding the 

[jira] [Created] (FLINK-34637) Migrate JoinConditionEqualityTransferRule

2024-03-10 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34637:
---

 Summary: Migrate JoinConditionEqualityTransferRule
 Key: FLINK-34637
 URL: https://issues.apache.org/jira/browse/FLINK-34637
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-10 Thread Jeyhun Karimov
Hi devs,

I’d like to start a discussion on FLIP-434: Support optimizations for
pre-partitioned data sources [1].

The FLIP introduces taking advantage of pre-partitioned data sources for
SQL/Table API (it is already supported as experimental feature in
DataStream API [2]).


Please find more details in the FLIP wiki document [1].
Looking forward to your feedback.

Regards,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/


Re: [DISCUSS] FLIP-419: Optimize multi-sink query plan generation

2024-03-10 Thread Jeyhun Karimov
Hi Martijn,

Thanks for your comment. I created a quick prototype to make sure that the
proposed hypothesis works. So, it still is a quick implementation.
On my local setup all tests pass, but I might need to spend some time (once
VOTE thread passes probably) to clean it and submit a PR.

Each test was performed via mvn test

(e.g., mvn test ...
-Dtest=org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
)

with master branch of Flink (dd16a4c07b2f8c96740fb522cb54cfd1d5a5e835)
against my PoC over 5 runs.

The runtime is derived from the test case's runtime reported by mvn test (and
not the runtime of the whole mvn test command).


I also included the above information in the FLIP.

Please let me know if you have further questions.


Regards,

Jeyhun





On Fri, Mar 8, 2024 at 12:01 PM Martijn Visser 
wrote:

> Hi Jeyhun Karimov,
>
> I see that you've already opened up a VOTE thread, but since you're talking
> about having a prototype already and results, I wondered if you could
> include the POC and how you've tested these results in the FLIP?
>
> Best regards,
>
> Martijn
>
> On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov 
> wrote:
>
> > Hi devs,
> >
> > I just wanted to give an update on this FLIP.
> > I updated the doc based on the comments from Jim.
> > Also, I developed a prototype and did some testing.
> >
> > I in my small prototype I ran the following tests:
> >
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
> >-
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
> >
> >
> > These tests are e2e dag optimization, including query parsing,
> validation,
> > optimization, and checking the results.
> >
> > In these e2e optimization tests, my prototype was 15-20% faster than
> > existing Flink optimization structure (with the "cost" of simplifying the
> > codebase).
> >
> >
> > Any questions/comments are more than welcome.
> >
> >
> > Regards,
> >
> > Jeyhun Karimov
> >
> > On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov 
> > wrote:
> >
> > > Hi Jim,
> > >
> > > Thanks for your comments. Please find my answers below:
> > >
> > >1. StreamOptimizeContext may still be needed to pass the fact that
> we
> > >>are optimizing a streaming query.  I don't think this class will go
> > >> away
> > >>completely.  (I agree it may become more simple if the kind or
> > >>mini-batch configuration can be removed.)
> > >
> > >
> > > What I meant is that it might go away if we get rid of
> > > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields.
> > > Of course if we can get rid of only one of them, then the
> > > *StreamOptimizeContext* class will not be removed but get simpler.
> > > Will update the doc accordingly.
> > >
> > >2. How are the mini-batch and changelog inference rules tightly
> > coupled?
> > >>I looked a little bit and I haven't seen any connection between
> them.
> > >> It
> > >>seems like the changelog inference is what needs to run multiple
> > times.
> > >
> > >
> > > Sorry for the misunderstanding. The mini-batch and changelog inference
> > are
> > > not coupled among themselves but with the high-level optimization
> logic.
> > > The idea is to separate the query optimization into 1) optimize 2)
> enrich
> > > with changelog inference 3) enrich with mini-batch interval inference
> and
> > > 4) rewrite
> > >
> > >3. I think your point about code complexity is unnecessary.
> > >> StreamOptimizeContext
> > >>extends org.apache.calcite.plan.Context which is used an interface
> to
> > >> pass
> > >>information and objects through the Calcite stack.
> > >
> > >
> > > I partially agree. Please see my answer above for the question 1.
> > >
> > >4. Is an alternative where the complexity of the changelog
> > optimization
> > >>can be moved into the `FlinkChangelogModeInferenceProgram`?  (If
> this
> > >> is
> > >>coupling between the mini-batch and changelog rules, then this
> would
> > >> not
> > >>make sense.)
> > >
> > >
> > > Good point. Yes, this is definitely 

Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-10 Thread Jeyhun Karimov
Hi Hangxiang,

Thanks for the proposal. +1 for it.
I have a few comments.

Proposal 2 has additional JNI overhead, but the overhead is relatively
> negligible when weighed against the latency of remote I/O.

- Is this argument true for all workloads? Or does this argument also hold
for workloads with many small files, which is quite a common case [1] ?

- Also, in many workloads the engine does not need the whole file either
because of the query forces it or
file type supports efficient filtering (e.g. ORC, parquet, arrow files), or
simply one file is "divided" among multiple workers.
In these cases, the engine spawns huge amount of scan range requests to the
file system to retrieve different parts of a file.
How the proposed solution would work with these workloads?

- The similar question related to the above applies also for caching ( I
know caching is subject of FLIP-429, asking here becasue of the related
section in this FLIP).

Regards,
Jeyhun

[1] https://blog.min.io/challenge-big-data-small-files/



On Thu, Mar 7, 2024 at 10:09 AM Hangxiang Yu  wrote:

> Hi devs,
>
>
> I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
> Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
>
> - FLIP-427: Disaggregated State Store
>
> This FLIP introduces the initial version of the ForSt disaggregated state
> store.
>
> Please make sure you have read the FLIP-423[1] to know the whole story, and
> we'll discuss the details of FLIP-427[2] under this mail. For the
> discussion of overall architecture or topics related with multiple
> sub-FLIPs, please post in the previous mail[3].
>
> Looking forward to hearing from you!
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
>
> [2] https://cwiki.apache.org/confluence/x/T4p3EQ
>
> [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>
>
> Best,
>
> Hangxiang.
>


Re: [VOTE] FLIP-420: Add API annotations for RocksDB StateBackend user-facing classes

2024-03-10 Thread Jing Ge
Hi Jinzhong,

Thanks for driving this topic and sorry for just joining the discussion
now. I replied in your discussion thread. Would you like to take a look
and let's keep the discussion there? I will come back to this thread and
vote once the discussion is done. Thanks!

Best regards,
Jing

On Thu, Mar 7, 2024 at 4:39 AM Zakelly Lan  wrote:

> +1 non-binding
>
> Thanks for proposing this.
>
> Best,
> Zakelly
>
> On Thu, Mar 7, 2024 at 10:13 AM Yanfei Lei  wrote:
>
> > +1(binding) for this vote.
> >
> > Hangxiang Yu  于2024年3月7日周四 09:54写道:
> > >
> > > +1 (binding)
> > >
> > > On Thu, Mar 7, 2024 at 9:34 AM Yun Tang  wrote:
> > >
> > > > > +1 for this FLIP.
> > > > Sorry for not being clear in my previous reply, it's a binding vote.
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Jeyhun Karimov 
> > > > Sent: Thursday, March 7, 2024 4:40
> > > > To: dev@flink.apache.org 
> > > > Subject: Re: [VOTE] FLIP-420: Add API annotations for RocksDB
> > StateBackend
> > > > user-facing classes
> > > >
> > > > Hi Jinzhong,
> > > >
> > > > Thanks for the FLIP.
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Regards,
> > > > Jeyhun
> > > >
> > > > On Wed, Mar 6, 2024 at 5:09 PM Yun Tang  wrote:
> > > >
> > > > > +1 for this FLIP.
> > > > >
> > > > >
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > > 
> > > > > From: Jinzhong Li 
> > > > > Sent: Wednesday, March 6, 2024 20:29
> > > > > To: dev@flink.apache.org 
> > > > > Subject: [VOTE] FLIP-420: Add API annotations for RocksDB
> > StateBackend
> > > > > user-facing classes
> > > > >
> > > > > Hi All,
> > > > >
> > > > > I'd like to start a vote on the FLIP-420: Add API annotations for
> > RocksDB
> > > > > StateBackend user-facing classes[1].
> > > > > The discussion thread is here [2].
> > > > >
> > > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > > or
> > > > > not enough votes.
> > > > >
> > > > >
> > > > > [1]https://cwiki.apache.org/confluence/x/JQs4EQ
> > > > > [2]
> https://lists.apache.org/thread/4t71lz2j2ft8hf90ylvtomynhr2qthoo
> > > > >
> > > > >
> > > > > Best,
> > > > > Jinzhong Li
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Hangxiang.
> >
> >
> >
> > --
> > Best,
> > Yanfei
> >
>


Re: [DISCUSS]FLIP-420: Add API annotations for RocksDB StateBackend user-facing classes

2024-03-10 Thread Jing Ge
Hi Jinzhong,

Sorry for the late reply. The key guideline of adding visibility annotation
is whether the interface/class is a customer-facing API. I was wondering if
SingleStateIterator and RocksDBRestoreOperation are exposed to users even
if they are interfaces, i.e. users can use their own implementation
classes. The flink-statebackend-rocksdb module has many more classes than
the FLIP described. Only adding @Internal annotation to these two
interfaces could be considered as an implicit hint that users can replace
the default behaviour with their own implementation.

Another suggestion is that we should start following FLIP-197[1](which is
already accepted by the community) to add more info into the PublicEvolving
annotation in order to kick off the graduation process. WDYT?

Best regards,
Jing

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process

On Mon, Feb 26, 2024 at 2:22 PM Jinzhong Li 
wrote:

> Hi all,
>
> Thanks for all the feedback. It seems there are no more questions
> unaddressed.  I would like to open the voting thread after three days.
>
> Please let me know if you have any concerns, thanks!
>
> Best,
> Jinzhong Li
>
> On Mon, Feb 26, 2024 at 11:29 AM Yanfei Lei  wrote:
>
> > @Yun Tang
> > Thanks for the information, +1 for marking
> > `ConfigurableRocksDBOptionsFactory` as `PublicEvolving `.
> >
> > Best,
> > Yanfei
> >
> > Yun Tang  于2024年2月23日周五 19:54写道:
> > >
> > > Hi Jinzhong,
> > >
> > > Thanks for driving this topic, and +1 for fixing the lack of
> annotation.
> > >
> > > @Yanfei the `ConfigurableRocksDBOptionsFactory` interface is introduced
> > for user extension, you can refer to the doc[1], which shows an example
> of
> > how to use this interface.
> > >
> > >
> > > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb-memory
> > >
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Yanfei Lei 
> > > Sent: Thursday, February 22, 2024 15:39
> > > To: dev@flink.apache.org 
> > > Subject: Re: [DISCUSS]FLIP-420: Add API annotations for RocksDB
> > StateBackend user-facing classes
> > >
> > > Hi Jinzhong,
> > > Thanks for driving this!
> > >
> > > 1. I'm wondering if `ConfigurableRocksDBOptionsFactory` will be used
> > > by users,  currently it looks like only developers use it in rocksdb
> > > state backend module. And Its only non-testing subclass
> > > "DefaultConfigurableOptionsFactory" is marked @Deprecated.
> > > 2. Regarding @Internal,  according to the comments, it is used for
> > > "Annotation to mark methods within stable, public APIs as an internal
> > > developer API."  So marking "SingleStateIterator" and
> > > "RocksDBRestoreOperation" as @Internal is acceptable for me.
> > >
> > > Best,
> > > Yanfei
> > >
> > > Jinzhong Li  于2024年1月25日周四 12:16写道:
> > > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for your comments!
> > > >
> > > > 1)I agree that almost no user would use "RocksDBStateUploader" and
> > > > "RocksDBStateDownloader" to do something. It's fine for me to keep
> them
> > > > unmarked.
> > > > 2)Regarding "SingleStateIterator", I think it's acceptable to either
> > leave
> > > > it unmarked or mark it as @Internal. I just consider that
> > > > SingleStateIterator is one interface with the "public" modifier and
> it
> > is
> > > > harmless to annotate it as @Internal.
> > > >
> > > >
> > > >
> > > >
> > > > Hi Hangxiang,
> > > >
> > > > Thanks for the reminder!
> > > >
> > > > It makes sense to mark RocksDBStateBackendFactory as Deprecated.
> > > >
> > > > Best,
> > > > Jinzhong Li
> > > >
> > > >
> > > > On Thu, Jan 25, 2024 at 10:22 AM Hangxiang Yu 
> > wrote:
> > > >
> > > > > Hi Jinzhong.
> > > > > Thanks for driving this!
> > > > > Some suggestions:
> > > > > 1. As RocksDBStateBackend marked as Deprecated, We should also
> > > > > mark RocksDBStateBackendFactory as Deprecated
> > > > > 2. Since 1.19 will be freezed in 1.26. Let's adjust the target
> > version to
> > > > > 1.20
> > > > >
> > > > >
> > > > > On Wed, Jan 24, 2024 at 11:50 PM Zakelly Lan <
> zakelly@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jinzhong,
> > > > > >
> > > > > > Thanks for driving this! +1 for fixing the lack of annotation.
> > > > > >
> > > > > > I'm wondering if we really need to annotate
> *RocksDBStateUploader*
> > and
> > > > > > *RocksDBStateDownloader
> > > > > > *with @Internal, as they seem to be ordinary classes without
> > interacting
> > > > > > with other modules.
> > > > > > Also, I have reservations about annotating *SingleStateIterator*,
> > but I'd
> > > > > > like to hear others' opinions and won't insist on this.
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > > On Wed, Jan 24, 2024 at 10:26 PM Jinzhong Li <
> > lijinzhong2...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > I’m opening this thread to discuss about FLIP-420: Add API
> > 

Re: [DISCUSS] FLIP Suggestion: Externalize Kudu Connector from Bahir

2024-03-10 Thread Jing Ge
Hi Ferenc,

Thanks for the proposal! +1 for it!

Similar to what Leonard mentioned. I would suggest:
1. Use the "release" to define the release version of the Kudu connector
itself.
2. Optionally, add one more row underneath to describe which Flink versions
this release will be compatible with, e.g. 1.17, 1.18. I think it makes
sense to support at least two last Flink releases. An example could be
found at [1]

Best regards,
Jing

 [1] https://lists.apache.org/thread/jcjfy3fgpg5cdnb9noslq2c77h0gtcwp

On Sun, Mar 10, 2024 at 3:46 PM Yanquan Lv  wrote:

> Hi Ferenc, +1 for this FLIP.
>
> Ferenc Csaky  于2024年3月9日周六 01:49写道:
>
> > Thank you Jeyhun, Leonard, and Hang for your comments! Let me
> > address them from earliest to latest.
> >
> > > How do you plan the review process in this case (e.g. incremental
> > over existing codebase or cumulative all at once) ?
> >
> > I think incremental would be less time consuming and complex for
> > reviewers so I would leaning towards that direction. I would
> > imagine multiple subtasks for migrating the existing code, and
> > updating the deprecated interfaces, so those should be separate PRs and
> > the release can be initiated when everything is merged.
> >
> > > (1) About the release version, could you specify kudu connector version
> > instead of flink version 1.18 as external connector version is different
> > with flink?
> > > (2) About the connector config options, could you enumerate these
> > options so that we can review they’re reasonable or not?
> >
> > I added these to the FLIP, copied the current configs options as is,
> PTAL.
> >
> > > (3) Metrics is also key part of connector, could you add the supported
> > connector metrics to public interface as well?
> >
> > The current Bahir conenctor code does not include any metrics and I did
> > not plan to include them into the scope of this FLIP.
> >
> > > I think that how to state this code originally lived in Bahir may be in
> > the
> > FLIP.
> >
> > I might miss your point, but the FLIP contains this: "Migrating the
> > current code keeping the history and noting it explicitly it was forked
> > from the Bahir repository [2]." Pls. share if you meant something else.
> >
> > Best,
> > Ferenc
> >
> >
> >
> > On Friday, March 8th, 2024 at 10:42, Hang Ruan 
> > wrote:
> >
> > >
> > >
> > > Hi, Ferenc.
> > >
> > > Thanks for the FLIP discussion. +1 for the proposal.
> > > I think that how to state this code originally lived in Bahir may be in
> > the
> > > FLIP.
> > >
> > > Best,
> > > Hang
> > >
> > > Leonard Xu xbjt...@gmail.com 于2024年3月7日周四 14:14写道:
> > >
> > > > Thanks Ferenc for kicking off this discussion, I left some comments
> > here:
> > > >
> > > > (1) About the release version, could you specify kudu connector
> version
> > > > instead of flink version 1.18 as external connector version is
> > different
> > > > with flink ?
> > > >
> > > > (2) About the connector config options, could you enumerate these
> > options
> > > > so that we can review they’re reasonable or not?
> > > >
> > > > (3) Metrics is also key part of connector, could you add the
> supported
> > > > connector metrics to public interface as well?
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > > > 2024年3月6日 下午11:23,Ferenc Csaky ferenc.cs...@pm.me.INVALID 写道:
> > > > >
> > > > > Hello devs,
> > > > >
> > > > > Opening this thread to discuss a FLIP [1] about externalizing the
> > Kudu
> > > > > connector, as recently
> > > > > the Apache Bahir project were moved to the attic [2]. Some details
> > were
> > > > > discussed already
> > > > > in another thread [3]. I am proposing to externalize this connector
> > and
> > > > > keep it maintainable,
> > > > > and up to date.
> > > > >
> > > > > Best regards,
> > > > > Ferenc
> > > > >
> > > > > [1]
> > > > >
> >
> https://docs.google.com/document/d/1vHF_uVe0FTYCb6PRVStovqDeqb_C_FKjt2P5xXa7uhE
> > > > > [2] https://bahir.apache.org/
> > > > > [3]
> https://lists.apache.org/thread/2nb8dxxfznkyl4hlhdm3vkomm8rk4oyq
> >
>


Re: [DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-10 Thread Jing Ge
Hi Yanfei,

Thanks for your proposal! The FLIP contains a lot of great new ideas. I'd
like to ask some questions to make sure we are on the same page.

> For the asynchronous interface, Record A should run with Read, Update and
Output, while Record B should stay at the Blocking buffer.

1. With this example, does it mean that there will be three mails for Read,
Update, and Output ?
2. If yes, since the Read, Update, and Output have to be executed before
Record B, does it make sense to encapsulate one mail instead of 3 mails
with more overhead? There must be some thoughts behind the design. Look
forward to it.

> The challenge arises in determining when all the processing logic
associated with Record A is fully executed. To address this, we have
adopted a reference counting mechanism that tracks ongoing operations
(either processing input or executing a callback) related to a single
record.

The design reminds me of the JVM reference counting for GC. Would you like
to add more description for cases when exceptions happened? E.g. when
reading or/and updating State throws IOExceptions.

> In more detail, AEC uses a inFilghtReocordNum  variable to trace the
current number of records in progress. Every time the AEC receives a new
record, the inFilghtReocordNum  increases by 1; when all processing and
callback for this record have completed, the inFilghtReocordNum  decreases
by 1. When processing one checkpoint mail, the current task thread will
give up the time slice through the yield() method of the mailbox executor,
so that the ongoing state request’s callback and the blocking state
requests will be drained first until inFlightRecordNum reduces to 0.

1. Speaking of draining, is it correct to understand that AEC is stateless?
E.g. AEC could be easily scaled out if it became a bottleneck.
2. There are Pseudo-code for the inFilghtReocordNum increment, would you
like to add Pseudo-code for the inFilghtReocordNum decrement to help us
understand the logic better?

The FLIP shows overall a great design! +1 for it! Looking forward to your
thoughts, thanks!

Best regards,
Jing

On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei  wrote:

> Hi devs,
>
> I'd like to start a discussion on FLIP-425: Asynchronous Execution
> Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage
> and Management[2].
>
> FLIP-425 introduces a non-blocking execution model leveraging the
> asynchronous APIs introduced in FLIP-424[3].
> For the whole story please read the FLIP-423[2], and this thread is
> aimed to discuss the details of "FLIP-425: Asynchronous Execution
> Model".
>
> Regarding the details of this FLIP, there have been some discussions
> here[4], mainly focusing on framework overhead profiling, watermark
> processing, etc. Please see link[4] for the context.
>
> Looking forward to hearing from you!
>
>
> [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> [4] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>
> Best,
> Yanfei
>


Re: [DISCUSS] FLIP Suggestion: Externalize Kudu Connector from Bahir

2024-03-10 Thread Yanquan Lv
Hi Ferenc, +1 for this FLIP.

Ferenc Csaky  于2024年3月9日周六 01:49写道:

> Thank you Jeyhun, Leonard, and Hang for your comments! Let me
> address them from earliest to latest.
>
> > How do you plan the review process in this case (e.g. incremental
> over existing codebase or cumulative all at once) ?
>
> I think incremental would be less time consuming and complex for
> reviewers so I would leaning towards that direction. I would
> imagine multiple subtasks for migrating the existing code, and
> updating the deprecated interfaces, so those should be separate PRs and
> the release can be initiated when everything is merged.
>
> > (1) About the release version, could you specify kudu connector version
> instead of flink version 1.18 as external connector version is different
> with flink?
> > (2) About the connector config options, could you enumerate these
> options so that we can review they’re reasonable or not?
>
> I added these to the FLIP, copied the current configs options as is, PTAL.
>
> > (3) Metrics is also key part of connector, could you add the supported
> connector metrics to public interface as well?
>
> The current Bahir conenctor code does not include any metrics and I did
> not plan to include them into the scope of this FLIP.
>
> > I think that how to state this code originally lived in Bahir may be in
> the
> FLIP.
>
> I might miss your point, but the FLIP contains this: "Migrating the
> current code keeping the history and noting it explicitly it was forked
> from the Bahir repository [2]." Pls. share if you meant something else.
>
> Best,
> Ferenc
>
>
>
> On Friday, March 8th, 2024 at 10:42, Hang Ruan 
> wrote:
>
> >
> >
> > Hi, Ferenc.
> >
> > Thanks for the FLIP discussion. +1 for the proposal.
> > I think that how to state this code originally lived in Bahir may be in
> the
> > FLIP.
> >
> > Best,
> > Hang
> >
> > Leonard Xu xbjt...@gmail.com 于2024年3月7日周四 14:14写道:
> >
> > > Thanks Ferenc for kicking off this discussion, I left some comments
> here:
> > >
> > > (1) About the release version, could you specify kudu connector version
> > > instead of flink version 1.18 as external connector version is
> different
> > > with flink ?
> > >
> > > (2) About the connector config options, could you enumerate these
> options
> > > so that we can review they’re reasonable or not?
> > >
> > > (3) Metrics is also key part of connector, could you add the supported
> > > connector metrics to public interface as well?
> > >
> > > Best,
> > > Leonard
> > >
> > > > 2024年3月6日 下午11:23,Ferenc Csaky ferenc.cs...@pm.me.INVALID 写道:
> > > >
> > > > Hello devs,
> > > >
> > > > Opening this thread to discuss a FLIP [1] about externalizing the
> Kudu
> > > > connector, as recently
> > > > the Apache Bahir project were moved to the attic [2]. Some details
> were
> > > > discussed already
> > > > in another thread [3]. I am proposing to externalize this connector
> and
> > > > keep it maintainable,
> > > > and up to date.
> > > >
> > > > Best regards,
> > > > Ferenc
> > > >
> > > > [1]
> > > >
> https://docs.google.com/document/d/1vHF_uVe0FTYCb6PRVStovqDeqb_C_FKjt2P5xXa7uhE
> > > > [2] https://bahir.apache.org/
> > > > [3] https://lists.apache.org/thread/2nb8dxxfznkyl4hlhdm3vkomm8rk4oyq
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-10 Thread Jing Ge
Hi Zakelly,

Thanks for your proposal. The FLIP looks in good shape. +1 for it! I'd like
to ask some questions to understand your thoughts more precisely.

1. StateFuture is a new interface. At first glance, it should
be @Experimental. But according to our API Arch rule[1], it should be at
least @PublicEvolving, if it will be used by any existing PublicEvloving
classes. You might want to add this info to your FLIP, if we want to go
with this option.

2. The return types of methods in State and related sub-interfaces are
StateFuture. Since the old State interfaces already have those methods
and Java does not allow method overload with the same method but different
return types. Do you want to change the old methods or use new interfaces?
My understanding is that, according to the description in the
"Compatibility, Deprecation, and Migration Plan'' section in the FLIP, new
interfaces will be defined alongside the old interfaces. I guess the
long-term intention of this FLIP is not to deprecate the synchronous State
API. Both State APIs will be supported for different scenarios. In this
case, does it make sense to:

2.1 annotated all new interfaces with @Experimental to have the
flexibility for further modifications?
2.2 use different names e.g. AsyncState etc. to avoid potential
human mistakes while coding(e.g. import wrong package by mistake etc.) and
ease the job development with state?

Best regards,
Jing


[1]
https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-architecture-tests/flink-architecture-tests-production/src/main/java/org/apache/flink/architecture/rules/ApiAnnotationRules.java#L99

On Thu, Mar 7, 2024 at 9:49 AM Zakelly Lan  wrote:

> Hi devs,
>
> I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
> Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
>
>  - FLIP-424: Asynchronous State APIs [2]
>
> This FLIP introduces new APIs for asynchronous state access.
>
> Please make sure you have read the FLIP-423[1] to know the whole story, and
> we'll discuss the details of FLIP-424[2] under this mail. For the
> discussion of overall architecture or topics related with multiple
> sub-FLIPs, please post in the previous mail[3].
>
> Looking forward to hearing from you!
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>
>
> Best,
> Zakelly
>


[jira] [Created] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-03-10 Thread Vincent Woo (Jira)
Vincent Woo created FLINK-34636:
---

 Summary: Requesting exclusive buffers timeout causes repeated 
restarts and cannot be automatically recovered
 Key: FLINK-34636
 URL: https://issues.apache.org/jira/browse/FLINK-34636
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Vincent Woo
 Attachments: image-20240308100308649.png, image-20240308101008765.png, 
image-20240308101407396.png, image-20240308101934756.png

Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
  
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
  
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
  
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
at java.lang.Thread.run(Thread.java:748) {code}
!image-20240308101407396.png!

Network metric:Only this TM is always 100%, without any variation.

!image-20240308100308649.png|width=2540,height=989!

The status of the task deployed to this TM cannot be RUNNING and the status 
change is slow

!image-20240308101008765.png!

Although the root exception thrown by the  application is 
PartitionNotFoundException, the actual underlying root cause exception log 
found is IOException: Timeout triggered when requesting exclusive buffers

!image-20240308101934756.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)