Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-30 Thread Jane Chan
pecific operators at this level. Another
> option
> > > is
> > > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN
> 'json
> > > plan
> > > >> as string'. Third option is to leverage a data platform to
> virtualize
> > > the
> > > >> compiled sql plan and provide related interactions for updating ttl
> > and
> > > >> submit(execute) the modified compiled sql plan.
> > > >>
> > > >> On the other side, there is one additional benefit with this
> proposal:
> > > we
> > > >> could fine tune SQL jobs while we migrate/upgrade them. That is
> nice!
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle
> > > >>
> > > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu 
> wrote:
> > > >>
> > > >> > Thanks Jane for the proposal.
> > > >> >
> > > >> > TTL of state is an execution phase configuration, serialized json
> > > graph
> > > >> > file is the graph for execution phase, supporting the operator
> level
> > > >> state
> > > >> > TTL in the execution json file makes sense to me.
> > > >> >
> > > >> > From the user's perspective, I have two concerns:
> > > >> > 1. By modifying the execution graph node configuration, this
> raises
> > > the
> > > >> > cost for users to understand, especially for SQL users.
> > > >> > 2. Submitting a SQL job through `exec plan json file` is not so
> > > >> intuitive
> > > >> > as users cannot see the SQL detail of the job
> > > >> >
> > > >> > Best,
> > > >> > Leonard
> > > >> >
> > > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang 
> > > >> wrote:
> > > >> >
> > > >> > > Hi, Jane.
> > > >> > >
> > > >> > > Thanks for driving this FLIP and this feature are very useful to
> > > many
> > > >> > > users. But I have two problems about the FLIP:
> > > >> > >
> > > >> > > 1. How the Gateway users use this feature? As far as I know, the
> > > >> EXEUCTE
> > > >> > > PLAN only supports local file right now.  Is it possible to
> extend
> > > >> this
> > > >> > > syntax to allow for reading plan files from remote file systems?
> > > >> > >
> > > >> > > 2. I would like to inquire if there are any limitations on this
> > > >> feature?
> > > >> > I
> > > >> > > have encountered several instances where the data did not expire
> > in
> > > >> the
> > > >> > > upstream operator, but it expired in the downstream operator,
> > > >> resulting
> > > >> > in
> > > >> > > abnormal calculation results or direct exceptions thrown by the
> > > >> operator
> > > >> > > (e.g. rank operator). Can we limit that the expiration time of
> > > >> downstream
> > > >> > > operator data should be greater than or equal to the expiration
> > time
> > > >> of
> > > >> > > upstream operator data?
> > > >> > >
> > > >> > > Best,
> > > >> > > Shengkai
> > > >> > >
> > > >> > > Yun Tang  于2023年3月24日周五 14:50写道:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > From my point of view, I am a bit against using SQL hint to
> set
> > > >> state
> > > >> > TTL
> > > >> > > > as FlinkSQL could be translated to several stateful operators.
> > If
> > > we
> > > >> > want
> > > >> > > > to let different state could have different TTL configs within
> > one
> > > >> > > > operator, the SQL hint solution could not work. A better way
> is
> > to
> > > >> > allow
> > > >> > > a
> > > >> > > > graphical IDE to display the stateful operators and let users
> > > >> configure
>

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-28 Thread Lincoln Lee
t; > >> dealing with SQL like scripts to moving between SQL like scripts and
> > file
> > >> modifications back and forth. This is a big change for user
> behaviours.
> > >> One
> > >> option could be that we upgrade/extend the COMPILE PLAN to allow users
> > >> update ttl for operators at the script level. But I am not sure if it
> is
> > >> possible to point out specific operators at this level. Another option
> > is
> > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json
> > plan
> > >> as string'. Third option is to leverage a data platform to virtualize
> > the
> > >> compiled sql plan and provide related interactions for updating ttl
> and
> > >> submit(execute) the modified compiled sql plan.
> > >>
> > >> On the other side, there is one additional benefit with this proposal:
> > we
> > >> could fine tune SQL jobs while we migrate/upgrade them. That is nice!
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle
> > >>
> > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu  wrote:
> > >>
> > >> > Thanks Jane for the proposal.
> > >> >
> > >> > TTL of state is an execution phase configuration, serialized json
> > graph
> > >> > file is the graph for execution phase, supporting the operator level
> > >> state
> > >> > TTL in the execution json file makes sense to me.
> > >> >
> > >> > From the user's perspective, I have two concerns:
> > >> > 1. By modifying the execution graph node configuration, this raises
> > the
> > >> > cost for users to understand, especially for SQL users.
> > >> > 2. Submitting a SQL job through `exec plan json file` is not so
> > >> intuitive
> > >> > as users cannot see the SQL detail of the job
> > >> >
> > >> > Best,
> > >> > Leonard
> > >> >
> > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang 
> > >> wrote:
> > >> >
> > >> > > Hi, Jane.
> > >> > >
> > >> > > Thanks for driving this FLIP and this feature are very useful to
> > many
> > >> > > users. But I have two problems about the FLIP:
> > >> > >
> > >> > > 1. How the Gateway users use this feature? As far as I know, the
> > >> EXEUCTE
> > >> > > PLAN only supports local file right now.  Is it possible to extend
> > >> this
> > >> > > syntax to allow for reading plan files from remote file systems?
> > >> > >
> > >> > > 2. I would like to inquire if there are any limitations on this
> > >> feature?
> > >> > I
> > >> > > have encountered several instances where the data did not expire
> in
> > >> the
> > >> > > upstream operator, but it expired in the downstream operator,
> > >> resulting
> > >> > in
> > >> > > abnormal calculation results or direct exceptions thrown by the
> > >> operator
> > >> > > (e.g. rank operator). Can we limit that the expiration time of
> > >> downstream
> > >> > > operator data should be greater than or equal to the expiration
> time
> > >> of
> > >> > > upstream operator data?
> > >> > >
> > >> > > Best,
> > >> > > Shengkai
> > >> > >
> > >> > > Yun Tang  于2023年3月24日周五 14:50写道:
> > >> > >
> > >> > > > Hi,
> > >> > > >
> > >> > > > From my point of view, I am a bit against using SQL hint to set
> > >> state
> > >> > TTL
> > >> > > > as FlinkSQL could be translated to several stateful operators.
> If
> > we
> > >> > want
> > >> > > > to let different state could have different TTL configs within
> one
> > >> > > > operator, the SQL hint solution could not work. A better way is
> to
> > >> > allow
> > >> > > a
> > >> > > > graphical IDE to display the stateful operators and let users
> > >> configure
> > >> > > > them. And the IDE submits the 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-28 Thread Martijn Visser
 copy the ttl configs from version1
> sql
> >> plan to version2 and drop version1. This means we have to keep the
> >> compiled
> >> json file and create a link with the original SQL script. I am not sure
> if
> >> I understood it correctly, it seems like a lot of maintenance effort.
> >> - If I am not mistaken, the compiled sql plan introduced by FLIP-190 is
> >> only used for SQL job migration/update. Common stages that Flink uses to
> >> produce the execution plan from SQL does not contain the compiling step.
> >> This makes one tool do two different jobs[1], upgrade + ttl tuning.
> >> and tighten the dependency on compiling sql plans. Flink SQL users have
> to
> >> deal with a compiled sql plan for performance optimization that is not
> >> designed for it.
> >> - The regular working process for Flink SQL users is changed, from only
> >> dealing with SQL like scripts to moving between SQL like scripts and
> file
> >> modifications back and forth. This is a big change for user behaviours.
> >> One
> >> option could be that we upgrade/extend the COMPILE PLAN to allow users
> >> update ttl for operators at the script level. But I am not sure if it is
> >> possible to point out specific operators at this level. Another option
> is
> >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json
> plan
> >> as string'. Third option is to leverage a data platform to virtualize
> the
> >> compiled sql plan and provide related interactions for updating ttl and
> >> submit(execute) the modified compiled sql plan.
> >>
> >> On the other side, there is one additional benefit with this proposal:
> we
> >> could fine tune SQL jobs while we migrate/upgrade them. That is nice!
> >>
> >> Best regards,
> >> Jing
> >>
> >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle
> >>
> >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu  wrote:
> >>
> >> > Thanks Jane for the proposal.
> >> >
> >> > TTL of state is an execution phase configuration, serialized json
> graph
> >> > file is the graph for execution phase, supporting the operator level
> >> state
> >> > TTL in the execution json file makes sense to me.
> >> >
> >> > From the user's perspective, I have two concerns:
> >> > 1. By modifying the execution graph node configuration, this raises
> the
> >> > cost for users to understand, especially for SQL users.
> >> > 2. Submitting a SQL job through `exec plan json file` is not so
> >> intuitive
> >> > as users cannot see the SQL detail of the job
> >> >
> >> > Best,
> >> > Leonard
> >> >
> >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang 
> >> wrote:
> >> >
> >> > > Hi, Jane.
> >> > >
> >> > > Thanks for driving this FLIP and this feature are very useful to
> many
> >> > > users. But I have two problems about the FLIP:
> >> > >
> >> > > 1. How the Gateway users use this feature? As far as I know, the
> >> EXEUCTE
> >> > > PLAN only supports local file right now.  Is it possible to extend
> >> this
> >> > > syntax to allow for reading plan files from remote file systems?
> >> > >
> >> > > 2. I would like to inquire if there are any limitations on this
> >> feature?
> >> > I
> >> > > have encountered several instances where the data did not expire in
> >> the
> >> > > upstream operator, but it expired in the downstream operator,
> >> resulting
> >> > in
> >> > > abnormal calculation results or direct exceptions thrown by the
> >> operator
> >> > > (e.g. rank operator). Can we limit that the expiration time of
> >> downstream
> >> > > operator data should be greater than or equal to the expiration time
> >> of
> >> > > upstream operator data?
> >> > >
> >> > > Best,
> >> > > Shengkai
> >> > >
> >> > > Yun Tang  于2023年3月24日周五 14:50写道:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > From my point of view, I am a bit against using SQL hint to set
> >> state
> >> > TTL
> >> > > > as FlinkSQL could be translated to several stateful oper

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-27 Thread Jing Ge
 Fri, Mar 24, 2023 at 4:02 PM Leonard Xu  wrote:
>>
>> > Thanks Jane for the proposal.
>> >
>> > TTL of state is an execution phase configuration, serialized json graph
>> > file is the graph for execution phase, supporting the operator level
>> state
>> > TTL in the execution json file makes sense to me.
>> >
>> > From the user's perspective, I have two concerns:
>> > 1. By modifying the execution graph node configuration, this raises the
>> > cost for users to understand, especially for SQL users.
>> > 2. Submitting a SQL job through `exec plan json file` is not so
>> intuitive
>> > as users cannot see the SQL detail of the job
>> >
>> > Best,
>> > Leonard
>> >
>> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang 
>> wrote:
>> >
>> > > Hi, Jane.
>> > >
>> > > Thanks for driving this FLIP and this feature are very useful to many
>> > > users. But I have two problems about the FLIP:
>> > >
>> > > 1. How the Gateway users use this feature? As far as I know, the
>> EXEUCTE
>> > > PLAN only supports local file right now.  Is it possible to extend
>> this
>> > > syntax to allow for reading plan files from remote file systems?
>> > >
>> > > 2. I would like to inquire if there are any limitations on this
>> feature?
>> > I
>> > > have encountered several instances where the data did not expire in
>> the
>> > > upstream operator, but it expired in the downstream operator,
>> resulting
>> > in
>> > > abnormal calculation results or direct exceptions thrown by the
>> operator
>> > > (e.g. rank operator). Can we limit that the expiration time of
>> downstream
>> > > operator data should be greater than or equal to the expiration time
>> of
>> > > upstream operator data?
>> > >
>> > > Best,
>> > > Shengkai
>> > >
>> > > Yun Tang  于2023年3月24日周五 14:50写道:
>> > >
>> > > > Hi,
>> > > >
>> > > > From my point of view, I am a bit against using SQL hint to set
>> state
>> > TTL
>> > > > as FlinkSQL could be translated to several stateful operators. If we
>> > want
>> > > > to let different state could have different TTL configs within one
>> > > > operator, the SQL hint solution could not work. A better way is to
>> > allow
>> > > a
>> > > > graphical IDE to display the stateful operators and let users
>> configure
>> > > > them. And the IDE submits the json plan to Flink to run jobs.
>> > > >
>> > > > For the details of the structure of ExecNodes, since the state name
>> is
>> > > > unique in the underlying state layer, shall we introduce the "index"
>> > tag
>> > > to
>> > > > identify the state config?
>> > > > What will happen with the conditions below:
>> > > > 1st run:
>> > > >{
>> > > >  "index": 0,
>> > > >  "ttl": "25920 ms",
>> > > >  "name": "join-lef-state"
>> > > >},
>> > > >{
>> > > >  "index": 1,
>> > > >  "ttl": "8640 ms",
>> > > >  "name": "join-right-state"
>> > > >}
>> > > >
>> > > > 2nd run:
>> > > >{
>> > > >  "index": 0,
>> > > >  "ttl": "8640 ms",
>> > > >  "name": "join-right-state"
>> > > >},
>> > > >{
>> > > >  "index": 1,
>> > > >  "ttl": "25920 ms",
>> > > >  "name": "join-lef-state"
>> > > >}
>> > > >
>> > > > Best
>> > > > Yun Tang
>> > > > 
>> > > > From: Jane Chan 
>> > > > Sent: Friday, March 24, 2023 11:57
>> > > > To: dev@flink.apache.org 
>> > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
>> > > operator
>> > > > level for Table API & SQL programs
>> > > >
>> > 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-27 Thread Jane Chan
Hi Shuo,

Thanks for your comments.

> The title of this FLIP is "For Table API & SQL", but this proposal
actually only covers SQL jobs via compiled json plan

Please correct me if I'm wrong, but I'm thinking that the scope of this
FLIP should be consistent with the FLIP-190. Here is an example to
illustrate the Table API usage.

tableEnv.createTemporaryTable("SourceTable",

TableDescriptor

.forConnector("datagen")
   .schema(

Schema.newBuilder()

.column("f0", DataTypes.STRING()).build())
  .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
  .build());
Table t = tableEnv.sqlQuery("select * from SourceTable");
tableEnv.createTemporaryTable("SinkTable",

TableDescriptor

.forConnector("print")

   .schema(Schema.newBuilder()

.column("f0", DataTypes.STRING()).build())
   .build());
t.insertInto("SinkTable")
.compilePlan()
.printJsonString(); // or writeToFile

As a result, I think the current title, "Support configuring state TTL at
operator level for Table API & SQL programs", is self-contained.
1. The proposed approach is applicable to both SQL and Table API programs
2. The configuration granularity is at the operator level

And I'm not quite sure if the FLIP title has a convention that requires the
proposed approach to be specified because I see that FLIP-190 doesn't do
that either.

Best,
Jane


On Mon, Mar 27, 2023 at 1:29 PM Shuo Cheng  wrote:

> Hi, Jane
>
> Thanks for your detailed explanation.
>
> > Maybe it deserves another FLIP to discuss whether we need a
> multiple-level state TTL configuration mechanism and how to properly
> implement it.
>
> +1, and this is also what I want to emphasize. The title of this FLIP is
> "For Table API & SQL", but this proposal actually only covers SQL jobs via
> compiled json plan, SQL users who don't want or cannot use the compiled
> json plan could not use the feature. If the FLIP title can state the scope
> more precisely , e.g., "Enhance EXECUTE PLAN to support operator-level
> state TTL configuration", it would be much more rigorous and
> self-consistent, and many concerns in the mailing list  may not exist.
> WDYT?
>
> Sincerely,
> Shuo
>
> On Fri, Mar 24, 2023 at 11:58 AM Jane Chan  wrote:
>
> > Hi Shammon and Shuo,
> >
> > Thanks for your valuable comments!
> >
> > Some thoughts:
> >
> > @Shuo
> > > I think it's more properly to say that hint does not affect the
> > equivalenceof execution plans (hash agg vs sort agg), not the equivalence
> > of execution
> > results, e.g., users can set 'scan.startup.mode' for kafka connector by
> > dynamic table option, which
> > also "intervene in the calculation of data results".
> >
> > IMO, the statement that "hint should not interfere with the calculation
> > results", means it should not interfere with internal computation. On the
> > other hand, 'scan.startup.mode' interferes with the ingestion of the
> data.
> > I think these two concepts are different, but of course, this is just my
> > opinion and welcome other views.
> >
> > > I think the final shape of state ttl configuring may like the that,
> > userscan define operator state ttl using SQL HINT (assumption...), but it
> > may
> > affects more than one stateful operators inside the same query block,
> then
> > users can further configure a specific one by modifying the compiled json
> > plan...
> >
> > Setting aside the issue of semantics, setting TTL from a higher level
> seems
> > to be attractive. This means that users only need to configure
> > 'table.exec.state.ttl' through the existing hint syntax to achieve the
> > effect. Everything is a familiar formula. But is it really the case?
> Hints
> > apply to a very broad range. Let me give an example.
> >
> > Suppose a user wants to set different TTLs for the two streams in a
> stream
> > join query. Where should the hints be written?
> >
> > -- the original query before configuring state TTL
> > create temporary view view1 as select  from my_table_1;
> > create temporary view view2 as select  from my_table_2;
> > create temporary view joined_view as
> > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
> > b.join_key;
> >
> > Option 1: declaring hints at the very beginning of the table scan
> >
> > -- should he or she write hints when declaring the first temporary view?
> > create temporary view view1 as select  from my_table_1
> > /*+(OPTIONS('table.exec.state.ttl'
> > = 'foo'))*/;
> > create temporary view view2 as select  from my_table_2
> > /*+(OPTIONS('table.exec.state.ttl'
> > = 'bar'))*/;
> > create temporary view joined_view as
> > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
> > b.join_key;
> >
> > Option 2: declaring hints when performing the join
> >
> > -- or should he or she write hints when declaring the join temporary
> view?
> > create temporary view view1 as select  from my_table_1;
> > create temporary view view2 as select  from my_table_2;
> > create 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-26 Thread Shuo Cheng
Hi, Jane

Thanks for your detailed explanation.

> Maybe it deserves another FLIP to discuss whether we need a
multiple-level state TTL configuration mechanism and how to properly
implement it.

+1, and this is also what I want to emphasize. The title of this FLIP is
"For Table API & SQL", but this proposal actually only covers SQL jobs via
compiled json plan, SQL users who don't want or cannot use the compiled
json plan could not use the feature. If the FLIP title can state the scope
more precisely , e.g., "Enhance EXECUTE PLAN to support operator-level
state TTL configuration", it would be much more rigorous and
self-consistent, and many concerns in the mailing list  may not exist. WDYT?

Sincerely,
Shuo

On Fri, Mar 24, 2023 at 11:58 AM Jane Chan  wrote:

> Hi Shammon and Shuo,
>
> Thanks for your valuable comments!
>
> Some thoughts:
>
> @Shuo
> > I think it's more properly to say that hint does not affect the
> equivalenceof execution plans (hash agg vs sort agg), not the equivalence
> of execution
> results, e.g., users can set 'scan.startup.mode' for kafka connector by
> dynamic table option, which
> also "intervene in the calculation of data results".
>
> IMO, the statement that "hint should not interfere with the calculation
> results", means it should not interfere with internal computation. On the
> other hand, 'scan.startup.mode' interferes with the ingestion of the data.
> I think these two concepts are different, but of course, this is just my
> opinion and welcome other views.
>
> > I think the final shape of state ttl configuring may like the that,
> userscan define operator state ttl using SQL HINT (assumption...), but it
> may
> affects more than one stateful operators inside the same query block, then
> users can further configure a specific one by modifying the compiled json
> plan...
>
> Setting aside the issue of semantics, setting TTL from a higher level seems
> to be attractive. This means that users only need to configure
> 'table.exec.state.ttl' through the existing hint syntax to achieve the
> effect. Everything is a familiar formula. But is it really the case? Hints
> apply to a very broad range. Let me give an example.
>
> Suppose a user wants to set different TTLs for the two streams in a stream
> join query. Where should the hints be written?
>
> -- the original query before configuring state TTL
> create temporary view view1 as select  from my_table_1;
> create temporary view view2 as select  from my_table_2;
> create temporary view joined_view as
> select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
> b.join_key;
>
> Option 1: declaring hints at the very beginning of the table scan
>
> -- should he or she write hints when declaring the first temporary view?
> create temporary view view1 as select  from my_table_1
> /*+(OPTIONS('table.exec.state.ttl'
> = 'foo'))*/;
> create temporary view view2 as select  from my_table_2
> /*+(OPTIONS('table.exec.state.ttl'
> = 'bar'))*/;
> create temporary view joined_view as
> select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
> b.join_key;
>
> Option 2: declaring hints when performing the join
>
> -- or should he or she write hints when declaring the join temporary view?
> create temporary view view1 as select  from my_table_1;
> create temporary view view2 as select  from my_table_2;
> create temporary view joined_view as
> select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' =
> 'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b
> on a.join_key = b.join_key;
>
> From the user's point of view, does he or she needs to care about the
> difference between these two kinds of style? Users might think the two may
> be equivalent; but in reality, as developers, how do we define the range in
> which hint starts and ends to take effect?
>
> Consider the following two assumptions
>
> 1. Assuming the hint takes effect from the moment it is declared and
> applies to any subsequent stateful operators until it is overridden by a
> new hint.
> If this is the assumption, it's clear that Option 1 and Option 2 are
> different because a ChangelogNormalize node can appear between scan and
> join. Meanwhile, which stream's TTL to apply to the following query after
> the stream join? It is unclear if the user does not explicitly set it.
> Should the engine make a random decision?
>
> 2. Assuming that the scope of the hint only applies to the current query
> block and does not extend to the next operator.
> In this case, the first way of setting the hint will not work because it
> cannot be brought to the join operator. Users must choose the second way to
> configure. Are users willing to remember this strange constraint on SQL
> writing style? Does this indicate a new learning cost?
>
> The example above is used to illustrate that while this approach may seem
> simple and direct, it actually has many limitations and may produce
> unexpected 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-26 Thread Jane Chan
le right now.  Is it possible to extend
>> this
>> > > > syntax to allow for reading plan files from remote file systems?
>> > > >
>> > > > 2. I would like to inquire if there are any limitations on this
>> > feature?
>> > > I
>> > > > have encountered several instances where the data did not expire in
>> the
>> > > > upstream operator, but it expired in the downstream operator,
>> resulting
>> > > in
>> > > > abnormal calculation results or direct exceptions thrown by the
>> > operator
>> > > > (e.g. rank operator). Can we limit that the expiration time of
>> > downstream
>> > > > operator data should be greater than or equal to the expiration
>> time of
>> > > > upstream operator data?
>> > > >
>> > > > Best,
>> > > > Shengkai
>> > > >
>> > > > Yun Tang  于2023年3月24日周五 14:50写道:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > From my point of view, I am a bit against using SQL hint to set
>> state
>> > > TTL
>> > > > > as FlinkSQL could be translated to several stateful operators. If
>> we
>> > > want
>> > > > > to let different state could have different TTL configs within one
>> > > > > operator, the SQL hint solution could not work. A better way is to
>> > > allow
>> > > > a
>> > > > > graphical IDE to display the stateful operators and let users
>> > configure
>> > > > > them. And the IDE submits the json plan to Flink to run jobs.
>> > > > >
>> > > > > For the details of the structure of ExecNodes, since the state
>> name
>> > is
>> > > > > unique in the underlying state layer, shall we introduce the
>> "index"
>> > > tag
>> > > > to
>> > > > > identify the state config?
>> > > > > What will happen with the conditions below:
>> > > > > 1st run:
>> > > > >{
>> > > > >  "index": 0,
>> > > > >  "ttl": "25920 ms",
>> > > > >  "name": "join-lef-state"
>> > > > >},
>> > > > >{
>> > > > >  "index": 1,
>> > > > >  "ttl": "8640 ms",
>> > > > >  "name": "join-right-state"
>> > > > >}
>> > > > >
>> > > > > 2nd run:
>> > > > >{
>> > > > >  "index": 0,
>> > > > >  "ttl": "8640 ms",
>> > > > >  "name": "join-right-state"
>> > > > >},
>> > > > >{
>> > > > >  "index": 1,
>> > > > >  "ttl": "25920 ms",
>> > > > >  "name": "join-lef-state"
>> > > > >}
>> > > > >
>> > > > > Best
>> > > > > Yun Tang
>> > > > > 
>> > > > > From: Jane Chan 
>> > > > > Sent: Friday, March 24, 2023 11:57
>> > > > > To: dev@flink.apache.org 
>> > > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
>> > > > operator
>> > > > > level for Table API & SQL programs
>> > > > >
>> > > > > Hi Shammon and Shuo,
>> > > > >
>> > > > > Thanks for your valuable comments!
>> > > > >
>> > > > > Some thoughts:
>> > > > >
>> > > > > @Shuo
>> > > > > > I think it's more properly to say that hint does not affect the
>> > > > > equivalenceof execution plans (hash agg vs sort agg), not the
>> > > equivalence
>> > > > > of execution
>> > > > > results, e.g., users can set 'scan.startup.mode' for kafka
>> connector
>> > by
>> > > > > dynamic table option, which
>> > > > > also "intervene in the calculation of data results".
>> > > > >
>> > > > > IMO, the statement that &qu

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Jane Chan
un jobs.
> > > > >
> > > > > For the details of the structure of ExecNodes, since the state name
> > is
> > > > > unique in the underlying state layer, shall we introduce the
> "index"
> > > tag
> > > > to
> > > > > identify the state config?
> > > > > What will happen with the conditions below:
> > > > > 1st run:
> > > > >{
> > > > >  "index": 0,
> > > > >  "ttl": "25920 ms",
> > > > >  "name": "join-lef-state"
> > > > >},
> > > > >{
> > > > >  "index": 1,
> > > > >  "ttl": "8640 ms",
> > > > >  "name": "join-right-state"
> > > > >}
> > > > >
> > > > > 2nd run:
> > > > >{
> > > > >  "index": 0,
> > > > >  "ttl": "8640 ms",
> > > > >  "name": "join-right-state"
> > > > >},
> > > > >{
> > > > >  "index": 1,
> > > > >  "ttl": "25920 ms",
> > > > >  "name": "join-lef-state"
> > > > >}
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > > 
> > > > > From: Jane Chan 
> > > > > Sent: Friday, March 24, 2023 11:57
> > > > > To: dev@flink.apache.org 
> > > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
> > > > operator
> > > > > level for Table API & SQL programs
> > > > >
> > > > > Hi Shammon and Shuo,
> > > > >
> > > > > Thanks for your valuable comments!
> > > > >
> > > > > Some thoughts:
> > > > >
> > > > > @Shuo
> > > > > > I think it's more properly to say that hint does not affect the
> > > > > equivalenceof execution plans (hash agg vs sort agg), not the
> > > equivalence
> > > > > of execution
> > > > > results, e.g., users can set 'scan.startup.mode' for kafka
> connector
> > by
> > > > > dynamic table option, which
> > > > > also "intervene in the calculation of data results".
> > > > >
> > > > > IMO, the statement that "hint should not interfere with the
> > calculation
> > > > > results", means it should not interfere with internal computation.
> On
> > > the
> > > > > other hand, 'scan.startup.mode' interferes with the ingestion of
> the
> > > > data.
> > > > > I think these two concepts are different, but of course, this is
> just
> > > my
> > > > > opinion and welcome other views.
> > > > >
> > > > > > I think the final shape of state ttl configuring may like the
> that,
> > > > > userscan define operator state ttl using SQL HINT (assumption...),
> > but
> > > it
> > > > > may
> > > > > affects more than one stateful operators inside the same query
> block,
> > > > then
> > > > > users can further configure a specific one by modifying the
> compiled
> > > json
> > > > > plan...
> > > > >
> > > > > Setting aside the issue of semantics, setting TTL from a higher
> level
> > > > seems
> > > > > to be attractive. This means that users only need to configure
> > > > > 'table.exec.state.ttl' through the existing hint syntax to achieve
> > the
> > > > > effect. Everything is a familiar formula. But is it really the
> case?
> > > > Hints
> > > > > apply to a very broad range. Let me give an example.
> > > > >
> > > > > Suppose a user wants to set different TTLs for the two streams in a
> > > > stream
> > > > > join query. Where should the hints be written?
> > > > >
> > > > > -- the original query before configuring state TTL
> > > > > create temporary view view1 as select  from my_table_1;
> > > > > create temporary view view2 as select  from my_table_2;
> > > > > cre

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Jane Chan
 migration/update. Common stages that Flink uses to
> produce the execution plan from SQL does not contain the compiling step.
> This makes one tool do two different jobs[1], upgrade + ttl tuning.
> and tighten the dependency on compiling sql plans. Flink SQL users have to
> deal with a compiled sql plan for performance optimization that is not
> designed for it.
> - The regular working process for Flink SQL users is changed, from only
> dealing with SQL like scripts to moving between SQL like scripts and file
> modifications back and forth. This is a big change for user behaviours. One
> option could be that we upgrade/extend the COMPILE PLAN to allow users
> update ttl for operators at the script level. But I am not sure if it is
> possible to point out specific operators at this level. Another option is
> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json plan
> as string'. Third option is to leverage a data platform to virtualize the
> compiled sql plan and provide related interactions for updating ttl and
> submit(execute) the modified compiled sql plan.
>
> On the other side, there is one additional benefit with this proposal: we
> could fine tune SQL jobs while we migrate/upgrade them. That is nice!
>
> Best regards,
> Jing
>
> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle
>
> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu  wrote:
>
> > Thanks Jane for the proposal.
> >
> > TTL of state is an execution phase configuration, serialized json graph
> > file is the graph for execution phase, supporting the operator level
> state
> > TTL in the execution json file makes sense to me.
> >
> > From the user's perspective, I have two concerns:
> > 1. By modifying the execution graph node configuration, this raises the
> > cost for users to understand, especially for SQL users.
> > 2. Submitting a SQL job through `exec plan json file` is not so intuitive
> > as users cannot see the SQL detail of the job
> >
> > Best,
> > Leonard
> >
> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang  wrote:
> >
> > > Hi, Jane.
> > >
> > > Thanks for driving this FLIP and this feature are very useful to many
> > > users. But I have two problems about the FLIP:
> > >
> > > 1. How the Gateway users use this feature? As far as I know, the
> EXEUCTE
> > > PLAN only supports local file right now.  Is it possible to extend this
> > > syntax to allow for reading plan files from remote file systems?
> > >
> > > 2. I would like to inquire if there are any limitations on this
> feature?
> > I
> > > have encountered several instances where the data did not expire in the
> > > upstream operator, but it expired in the downstream operator, resulting
> > in
> > > abnormal calculation results or direct exceptions thrown by the
> operator
> > > (e.g. rank operator). Can we limit that the expiration time of
> downstream
> > > operator data should be greater than or equal to the expiration time of
> > > upstream operator data?
> > >
> > > Best,
> > > Shengkai
> > >
> > > Yun Tang  于2023年3月24日周五 14:50写道:
> > >
> > > > Hi,
> > > >
> > > > From my point of view, I am a bit against using SQL hint to set state
> > TTL
> > > > as FlinkSQL could be translated to several stateful operators. If we
> > want
> > > > to let different state could have different TTL configs within one
> > > > operator, the SQL hint solution could not work. A better way is to
> > allow
> > > a
> > > > graphical IDE to display the stateful operators and let users
> configure
> > > > them. And the IDE submits the json plan to Flink to run jobs.
> > > >
> > > > For the details of the structure of ExecNodes, since the state name
> is
> > > > unique in the underlying state layer, shall we introduce the "index"
> > tag
> > > to
> > > > identify the state config?
> > > > What will happen with the conditions below:
> > > > 1st run:
> > > >{
> > > >  "index": 0,
> > > >  "ttl": "25920 ms",
> > > >  "name": "join-lef-state"
> > > >},
> > > >{
> > > >  "index": 1,
> > > >  "ttl": "8640 ms",
> > > >  "name": "join-right-state"
> > > >}
> > > >
> > > > 2n

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Benchao Li
But I have two problems about the FLIP:
> > >
> > > 1. How the Gateway users use this feature? As far as I know, the
> EXEUCTE
> > > PLAN only supports local file right now.  Is it possible to extend this
> > > syntax to allow for reading plan files from remote file systems?
> > >
> > > 2. I would like to inquire if there are any limitations on this
> feature?
> > I
> > > have encountered several instances where the data did not expire in the
> > > upstream operator, but it expired in the downstream operator, resulting
> > in
> > > abnormal calculation results or direct exceptions thrown by the
> operator
> > > (e.g. rank operator). Can we limit that the expiration time of
> downstream
> > > operator data should be greater than or equal to the expiration time of
> > > upstream operator data?
> > >
> > > Best,
> > > Shengkai
> > >
> > > Yun Tang  于2023年3月24日周五 14:50写道:
> > >
> > > > Hi,
> > > >
> > > > From my point of view, I am a bit against using SQL hint to set state
> > TTL
> > > > as FlinkSQL could be translated to several stateful operators. If we
> > want
> > > > to let different state could have different TTL configs within one
> > > > operator, the SQL hint solution could not work. A better way is to
> > allow
> > > a
> > > > graphical IDE to display the stateful operators and let users
> configure
> > > > them. And the IDE submits the json plan to Flink to run jobs.
> > > >
> > > > For the details of the structure of ExecNodes, since the state name
> is
> > > > unique in the underlying state layer, shall we introduce the "index"
> > tag
> > > to
> > > > identify the state config?
> > > > What will happen with the conditions below:
> > > > 1st run:
> > > >    {
> > > >  "index": 0,
> > > >  "ttl": "25920 ms",
> > > >  "name": "join-lef-state"
> > > >},
> > > >{
> > > >  "index": 1,
> > > >  "ttl": "8640 ms",
> > > >  "name": "join-right-state"
> > > >}
> > > >
> > > > 2nd run:
> > > >{
> > > >  "index": 0,
> > > >  "ttl": "8640 ms",
> > > >  "name": "join-right-state"
> > > >},
> > > >{
> > > >  "index": 1,
> > > >  "ttl": "25920 ms",
> > > >  "name": "join-lef-state"
> > > >}
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Jane Chan 
> > > > Sent: Friday, March 24, 2023 11:57
> > > > To: dev@flink.apache.org 
> > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
> > > operator
> > > > level for Table API & SQL programs
> > > >
> > > > Hi Shammon and Shuo,
> > > >
> > > > Thanks for your valuable comments!
> > > >
> > > > Some thoughts:
> > > >
> > > > @Shuo
> > > > > I think it's more properly to say that hint does not affect the
> > > > equivalenceof execution plans (hash agg vs sort agg), not the
> > equivalence
> > > > of execution
> > > > results, e.g., users can set 'scan.startup.mode' for kafka connector
> by
> > > > dynamic table option, which
> > > > also "intervene in the calculation of data results".
> > > >
> > > > IMO, the statement that "hint should not interfere with the
> calculation
> > > > results", means it should not interfere with internal computation. On
> > the
> > > > other hand, 'scan.startup.mode' interferes with the ingestion of the
> > > data.
> > > > I think these two concepts are different, but of course, this is just
> > my
> > > > opinion and welcome other views.
> > > >
> > > > > I think the final shape of state ttl configuring may like the that,
> > > > userscan define operator state ttl using SQL HINT (assumption...),
> but
> > it
> > > > may
> > > > affects more

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-24 Thread Jing Ge
t;
> tag
> > to
> > > identify the state config?
> > > What will happen with the conditions below:
> > > 1st run:
> > >{
> > >  "index": 0,
> > >  "ttl": "25920 ms",
> > >  "name": "join-lef-state"
> > >},
> > >{
> > >  "index": 1,
> > >  "ttl": "86400000 ms",
> > >  "name": "join-right-state"
> > >    }
> > >
> > > 2nd run:
> > >{
> > >  "index": 0,
> > >  "ttl": "8640 ms",
> > >  "name": "join-right-state"
> > >},
> > >{
> > >  "index": 1,
> > >  "ttl": "25920 ms",
> > >  "name": "join-lef-state"
> > >}
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Jane Chan 
> > > Sent: Friday, March 24, 2023 11:57
> > > To: dev@flink.apache.org 
> > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
> > operator
> > > level for Table API & SQL programs
> > >
> > > Hi Shammon and Shuo,
> > >
> > > Thanks for your valuable comments!
> > >
> > > Some thoughts:
> > >
> > > @Shuo
> > > > I think it's more properly to say that hint does not affect the
> > > equivalenceof execution plans (hash agg vs sort agg), not the
> equivalence
> > > of execution
> > > results, e.g., users can set 'scan.startup.mode' for kafka connector by
> > > dynamic table option, which
> > > also "intervene in the calculation of data results".
> > >
> > > IMO, the statement that "hint should not interfere with the calculation
> > > results", means it should not interfere with internal computation. On
> the
> > > other hand, 'scan.startup.mode' interferes with the ingestion of the
> > data.
> > > I think these two concepts are different, but of course, this is just
> my
> > > opinion and welcome other views.
> > >
> > > > I think the final shape of state ttl configuring may like the that,
> > > userscan define operator state ttl using SQL HINT (assumption...), but
> it
> > > may
> > > affects more than one stateful operators inside the same query block,
> > then
> > > users can further configure a specific one by modifying the compiled
> json
> > > plan...
> > >
> > > Setting aside the issue of semantics, setting TTL from a higher level
> > seems
> > > to be attractive. This means that users only need to configure
> > > 'table.exec.state.ttl' through the existing hint syntax to achieve the
> > > effect. Everything is a familiar formula. But is it really the case?
> > Hints
> > > apply to a very broad range. Let me give an example.
> > >
> > > Suppose a user wants to set different TTLs for the two streams in a
> > stream
> > > join query. Where should the hints be written?
> > >
> > > -- the original query before configuring state TTL
> > > create temporary view view1 as select  from my_table_1;
> > > create temporary view view2 as select  from my_table_2;
> > > create temporary view joined_view as
> > > select view1.*, view2.* from my_view_1 a join my_view_2 b on
> a.join_key =
> > > b.join_key;
> > >
> > > Option 1: declaring hints at the very beginning of the table scan
> > >
> > > -- should he or she write hints when declaring the first temporary
> view?
> > > create temporary view view1 as select  from my_table_1
> > > /*+(OPTIONS('table.exec.state.ttl'
> > > = 'foo'))*/;
> > > create temporary view view2 as select  from my_table_2
> > > /*+(OPTIONS('table.exec.state.ttl'
> > > = 'bar'))*/;
> > > create temporary view joined_view as
> > > select view1.*, view2.* from my_view_1 a join my_view_2 b on
> a.join_key =
> > > b.join_key;
> > >
> > > Option 2: declaring hints when performing the join
> > >
> > > -- or should he or she write hints when declaring the join temporary
> > view?
> > > create temporary view view1 as select  from my_table_1;
> > > create temporary view view2 as select  from my_table_2;
> > > create temporary view joi

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-24 Thread Leonard Xu
Thanks Jane for the proposal.

TTL of state is an execution phase configuration, serialized json graph
file is the graph for execution phase, supporting the operator level state
TTL in the execution json file makes sense to me.

>From the user's perspective, I have two concerns:
1. By modifying the execution graph node configuration, this raises the
cost for users to understand, especially for SQL users.
2. Submitting a SQL job through `exec plan json file` is not so intuitive
as users cannot see the SQL detail of the job

Best,
Leonard

On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang  wrote:

> Hi, Jane.
>
> Thanks for driving this FLIP and this feature are very useful to many
> users. But I have two problems about the FLIP:
>
> 1. How the Gateway users use this feature? As far as I know, the EXEUCTE
> PLAN only supports local file right now.  Is it possible to extend this
> syntax to allow for reading plan files from remote file systems?
>
> 2. I would like to inquire if there are any limitations on this feature? I
> have encountered several instances where the data did not expire in the
> upstream operator, but it expired in the downstream operator, resulting in
> abnormal calculation results or direct exceptions thrown by the operator
> (e.g. rank operator). Can we limit that the expiration time of downstream
> operator data should be greater than or equal to the expiration time of
> upstream operator data?
>
> Best,
> Shengkai
>
> Yun Tang  于2023年3月24日周五 14:50写道:
>
> > Hi,
> >
> > From my point of view, I am a bit against using SQL hint to set state TTL
> > as FlinkSQL could be translated to several stateful operators. If we want
> > to let different state could have different TTL configs within one
> > operator, the SQL hint solution could not work. A better way is to allow
> a
> > graphical IDE to display the stateful operators and let users configure
> > them. And the IDE submits the json plan to Flink to run jobs.
> >
> > For the details of the structure of ExecNodes, since the state name is
> > unique in the underlying state layer, shall we introduce the "index" tag
> to
> > identify the state config?
> > What will happen with the conditions below:
> > 1st run:
> >{
> >  "index": 0,
> >  "ttl": "25920 ms",
> >  "name": "join-lef-state"
> >},
> >{
> >  "index": 1,
> >  "ttl": "8640 ms",
> >  "name": "join-right-state"
> >}
> >
> > 2nd run:
> >{
> >  "index": 0,
> >  "ttl": "86400000 ms",
> >  "name": "join-right-state"
> >},
> >{
> >  "index": 1,
> >  "ttl": "25920 ms",
> >  "name": "join-lef-state"
> >}
> >
> > Best
> > Yun Tang
> > 
> > From: Jane Chan 
> > Sent: Friday, March 24, 2023 11:57
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at
> operator
> > level for Table API & SQL programs
> >
> > Hi Shammon and Shuo,
> >
> > Thanks for your valuable comments!
> >
> > Some thoughts:
> >
> > @Shuo
> > > I think it's more properly to say that hint does not affect the
> > equivalenceof execution plans (hash agg vs sort agg), not the equivalence
> > of execution
> > results, e.g., users can set 'scan.startup.mode' for kafka connector by
> > dynamic table option, which
> > also "intervene in the calculation of data results".
> >
> > IMO, the statement that "hint should not interfere with the calculation
> > results", means it should not interfere with internal computation. On the
> > other hand, 'scan.startup.mode' interferes with the ingestion of the
> data.
> > I think these two concepts are different, but of course, this is just my
> > opinion and welcome other views.
> >
> > > I think the final shape of state ttl configuring may like the that,
> > userscan define operator state ttl using SQL HINT (assumption...), but it
> > may
> > affects more than one stateful operators inside the same query block,
> then
> > users can further configure a specific one by modifying the compiled json
> > plan...
> >
> > Setting aside the issue of semantics, setting TTL from a higher level
> seems
> > to be attractive. This means that users only need to configure
> > 't

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-24 Thread Shengkai Fang
Hi, Jane.

Thanks for driving this FLIP and this feature are very useful to many
users. But I have two problems about the FLIP:

1. How the Gateway users use this feature? As far as I know, the EXEUCTE
PLAN only supports local file right now.  Is it possible to extend this
syntax to allow for reading plan files from remote file systems?

2. I would like to inquire if there are any limitations on this feature? I
have encountered several instances where the data did not expire in the
upstream operator, but it expired in the downstream operator, resulting in
abnormal calculation results or direct exceptions thrown by the operator
(e.g. rank operator). Can we limit that the expiration time of downstream
operator data should be greater than or equal to the expiration time of
upstream operator data?

Best,
Shengkai

Yun Tang  于2023年3月24日周五 14:50写道:

> Hi,
>
> From my point of view, I am a bit against using SQL hint to set state TTL
> as FlinkSQL could be translated to several stateful operators. If we want
> to let different state could have different TTL configs within one
> operator, the SQL hint solution could not work. A better way is to allow a
> graphical IDE to display the stateful operators and let users configure
> them. And the IDE submits the json plan to Flink to run jobs.
>
> For the details of the structure of ExecNodes, since the state name is
> unique in the underlying state layer, shall we introduce the "index" tag to
> identify the state config?
> What will happen with the conditions below:
> 1st run:
>{
>  "index": 0,
>  "ttl": "25920 ms",
>  "name": "join-lef-state"
>},
>{
>  "index": 1,
>  "ttl": "8640 ms",
>  "name": "join-right-state"
>}
>
> 2nd run:
>{
>  "index": 0,
>  "ttl": "8640 ms",
>  "name": "join-right-state"
>    },
>    {
>  "index": 1,
>      "ttl": "259200000 ms",
>  "name": "join-lef-state"
>}
>
> Best
> Yun Tang
> 
> From: Jane Chan 
> Sent: Friday, March 24, 2023 11:57
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator
> level for Table API & SQL programs
>
> Hi Shammon and Shuo,
>
> Thanks for your valuable comments!
>
> Some thoughts:
>
> @Shuo
> > I think it's more properly to say that hint does not affect the
> equivalenceof execution plans (hash agg vs sort agg), not the equivalence
> of execution
> results, e.g., users can set 'scan.startup.mode' for kafka connector by
> dynamic table option, which
> also "intervene in the calculation of data results".
>
> IMO, the statement that "hint should not interfere with the calculation
> results", means it should not interfere with internal computation. On the
> other hand, 'scan.startup.mode' interferes with the ingestion of the data.
> I think these two concepts are different, but of course, this is just my
> opinion and welcome other views.
>
> > I think the final shape of state ttl configuring may like the that,
> userscan define operator state ttl using SQL HINT (assumption...), but it
> may
> affects more than one stateful operators inside the same query block, then
> users can further configure a specific one by modifying the compiled json
> plan...
>
> Setting aside the issue of semantics, setting TTL from a higher level seems
> to be attractive. This means that users only need to configure
> 'table.exec.state.ttl' through the existing hint syntax to achieve the
> effect. Everything is a familiar formula. But is it really the case? Hints
> apply to a very broad range. Let me give an example.
>
> Suppose a user wants to set different TTLs for the two streams in a stream
> join query. Where should the hints be written?
>
> -- the original query before configuring state TTL
> create temporary view view1 as select  from my_table_1;
> create temporary view view2 as select  from my_table_2;
> create temporary view joined_view as
> select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
> b.join_key;
>
> Option 1: declaring hints at the very beginning of the table scan
>
> -- should he or she write hints when declaring the first temporary view?
> create temporary view view1 as select  from my_table_1
> /*+(OPTIONS('table.exec.state.ttl'
> = 'foo'))*/;
> create temporary view view2 as select  from my_table_2
> /*+(OPTIONS('table.exec.state.ttl'
> = 'bar'))*/;
> create temporary view joined_view as
> 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-24 Thread Jane Chan
Hi Yun,

Thanks for your interest in this FLIP.

> A better way is to allow a graphical IDE to display the stateful
operators and let users configure them. And the IDE submits the json plan
to Flink to run jobs.

This is a very useful suggestion. Actually, during the design phase, we
considered the complexity of users manually modifying JSON. At that time,
we discussed whether to develop an additional tool to help users modify
JSON more easily, but nothing best than a graphical interface to modify the
TTL of each operator. It's a pity that https://flink.apache.org/visualizer/
has been taken offline, otherwise it would have been a great addition to
the experience.


> since the state name is unique in the underlying state layer, shall we
introduce the "index" tag to identify the state config?

I'd like to explain why we are considering adding both the index (which
annotates the stateful operator's input order) and the name to the metadata
of the state.
Giving a meaningful naming like "join-left-state" or "deduplicate-state" is
trying to help users understand as much as possible what each state does.
E.g. suppose we have a graphical IDE to let users configure, then we can
display the state's name instead of the index to users. While on the other
hand, for a stateful MultipleInputStreamOperator, it's difficult to
distinguish which input stream the current state belongs to simply from the
name.

Back to the case, the 2nd run will not happen. This is because according to
the current stream join implementation, the first input of the
AbstractStreamingJoinOperator (you can reach to
StreamingJoinOperator.java#L123
<https://github.com/apache/flink/blob/60f3b2b32b593bf74a90015765817b3bf9c95d54/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L123>)
will always be the left side and the second input will always be the right
side, regardless of whether it is a left outer join or right outer join.

Let me give a detailed example. You can tell from the plan that the
first_table is always the first input of the operator (which is nearest to
the join node).

Flink SQL> show create table first_table;
CREATE TABLE `default_catalog`.`default_database`.`first_table` (
  `f0` INT,
  `f1` VARCHAR(2147483647)
) WITH (
  'connector' = 'datagen'
)

Flink SQL> show create table second_table;
CREATE TABLE `default_catalog`.`default_database`.`second_table` (
  `f0` INT,
  `f1` DOUBLE
) WITH (
  'connector' = 'datagen'
)

Flink SQL> explain select a.*, b.* from first_table a left join
second_table b on a.f0 = b.f0;
== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(f0, f00)], select=[f0, f1, f00,
f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[f0]])
:  +- TableSourceScan(table=[[default_catalog, default_database,
first_table]], fields=[f0, f1])
+- Exchange(distribution=[hash[f0]])
  +- TableSourceScan(table=[[default_catalog, default_database,
second_table]], fields=[f0, f1])

Flink SQL> explain select a.*, b.* from first_table a right join
second_table b on a.f0 = b.f0;
== Optimized Physical Plan ==
Join(joinType=[RightOuterJoin], where=[=(f0, f00)], select=[f0, f1, f00,
f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[f0]])
:  +- TableSourceScan(table=[[default_catalog, default_database,
first_table]], fields=[f0, f1])
+- Exchange(distribution=[hash[f0]])
  +- TableSourceScan(table=[[default_catalog, default_database,
second_table]], fields=[f0, f1])


Best,
Jane

On Fri, Mar 24, 2023 at 2:51 PM Yun Tang  wrote:

> Hi,
>
> From my point of view, I am a bit against using SQL hint to set state TTL
> as FlinkSQL could be translated to several stateful operators. If we want
> to let different state could have different TTL configs within one
> operator, the SQL hint solution could not work. A better way is to allow a
> graphical IDE to display the stateful operators and let users configure
> them. And the IDE submits the json plan to Flink to run jobs.
>
> For the details of the structure of ExecNodes, since the state name is
> unique in the underlying state layer, shall we introduce the "index" tag to
> identify the state config?
> What will happen with the conditions below:
> 1st run:
>{
>  "index": 0,
>  "ttl": "25920 ms",
>  "name": "join-lef-state"
>},
>{
>  "index": 1,
>  "ttl": "8640 ms",
>  "name": "join-right-state"
>}
>
> 2nd run:
>{
>  "index": 0,
>  "ttl": "8640 ms",
>      "name": "join-right-state"
>    },
>    {
>  "index": 1,
>  "ttl&qu

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-24 Thread Yun Tang
Hi,

From my point of view, I am a bit against using SQL hint to set state TTL as 
FlinkSQL could be translated to several stateful operators. If we want to let 
different state could have different TTL configs within one operator, the SQL 
hint solution could not work. A better way is to allow a graphical IDE to 
display the stateful operators and let users configure them. And the IDE 
submits the json plan to Flink to run jobs.

For the details of the structure of ExecNodes, since the state name is unique 
in the underlying state layer, shall we introduce the "index" tag to identify 
the state config?
What will happen with the conditions below:
1st run:
   {
 "index": 0,
 "ttl": "25920 ms",
 "name": "join-lef-state"
   },
   {
 "index": 1,
 "ttl": "8640 ms",
 "name": "join-right-state"
   }

2nd run:
   {
 "index": 0,
 "ttl": "8640 ms",
 "name": "join-right-state"
   },
   {
 "index": 1,
 "ttl": "25920 ms",
     "name": "join-lef-state"
   }

Best
Yun Tang
____
From: Jane Chan 
Sent: Friday, March 24, 2023 11:57
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator 
level for Table API & SQL programs

Hi Shammon and Shuo,

Thanks for your valuable comments!

Some thoughts:

@Shuo
> I think it's more properly to say that hint does not affect the
equivalenceof execution plans (hash agg vs sort agg), not the equivalence
of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector by
dynamic table option, which
also "intervene in the calculation of data results".

IMO, the statement that "hint should not interfere with the calculation
results", means it should not interfere with internal computation. On the
other hand, 'scan.startup.mode' interferes with the ingestion of the data.
I think these two concepts are different, but of course, this is just my
opinion and welcome other views.

> I think the final shape of state ttl configuring may like the that,
userscan define operator state ttl using SQL HINT (assumption...), but it
may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

Setting aside the issue of semantics, setting TTL from a higher level seems
to be attractive. This means that users only need to configure
'table.exec.state.ttl' through the existing hint syntax to achieve the
effect. Everything is a familiar formula. But is it really the case? Hints
apply to a very broad range. Let me give an example.

Suppose a user wants to set different TTLs for the two streams in a stream
join query. Where should the hints be written?

-- the original query before configuring state TTL
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 1: declaring hints at the very beginning of the table scan

-- should he or she write hints when declaring the first temporary view?
create temporary view view1 as select  from my_table_1
/*+(OPTIONS('table.exec.state.ttl'
= 'foo'))*/;
create temporary view view2 as select  from my_table_2
/*+(OPTIONS('table.exec.state.ttl'
= 'bar'))*/;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 2: declaring hints when performing the join

-- or should he or she write hints when declaring the join temporary view?
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' =
'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b
on a.join_key = b.join_key;

From the user's point of view, does he or she needs to care about the
difference between these two kinds of style? Users might think the two may
be equivalent; but in reality, as developers, how do we define the range in
which hint starts and ends to take effect?

Consider the following two assumptions

1. Assuming the hint takes effect from the moment it is declared and
applies to any subsequent stateful operators until it is overridden by a
new hint.
If this is the assumption, it's clear that Option 1 and Option 2 are
different because a ChangelogNormalize node can appear between scan and
join. Meanwhile, which stream's TTL to apply to the following query after
the stream join? It is unclear if the user does not explicitly set it.
Should the engine make a random dec

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Jane Chan
Hi Shammon and Shuo,

Thanks for your valuable comments!

Some thoughts:

@Shuo
> I think it's more properly to say that hint does not affect the
equivalenceof execution plans (hash agg vs sort agg), not the equivalence
of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector by
dynamic table option, which
also "intervene in the calculation of data results".

IMO, the statement that "hint should not interfere with the calculation
results", means it should not interfere with internal computation. On the
other hand, 'scan.startup.mode' interferes with the ingestion of the data.
I think these two concepts are different, but of course, this is just my
opinion and welcome other views.

> I think the final shape of state ttl configuring may like the that,
userscan define operator state ttl using SQL HINT (assumption...), but it
may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

Setting aside the issue of semantics, setting TTL from a higher level seems
to be attractive. This means that users only need to configure
'table.exec.state.ttl' through the existing hint syntax to achieve the
effect. Everything is a familiar formula. But is it really the case? Hints
apply to a very broad range. Let me give an example.

Suppose a user wants to set different TTLs for the two streams in a stream
join query. Where should the hints be written?

-- the original query before configuring state TTL
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 1: declaring hints at the very beginning of the table scan

-- should he or she write hints when declaring the first temporary view?
create temporary view view1 as select  from my_table_1
/*+(OPTIONS('table.exec.state.ttl'
= 'foo'))*/;
create temporary view view2 as select  from my_table_2
/*+(OPTIONS('table.exec.state.ttl'
= 'bar'))*/;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 2: declaring hints when performing the join

-- or should he or she write hints when declaring the join temporary view?
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' =
'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b
on a.join_key = b.join_key;

>From the user's point of view, does he or she needs to care about the
difference between these two kinds of style? Users might think the two may
be equivalent; but in reality, as developers, how do we define the range in
which hint starts and ends to take effect?

Consider the following two assumptions

1. Assuming the hint takes effect from the moment it is declared and
applies to any subsequent stateful operators until it is overridden by a
new hint.
If this is the assumption, it's clear that Option 1 and Option 2 are
different because a ChangelogNormalize node can appear between scan and
join. Meanwhile, which stream's TTL to apply to the following query after
the stream join? It is unclear if the user does not explicitly set it.
Should the engine make a random decision?

2. Assuming that the scope of the hint only applies to the current query
block and does not extend to the next operator.
In this case, the first way of setting the hint will not work because it
cannot be brought to the join operator. Users must choose the second way to
configure. Are users willing to remember this strange constraint on SQL
writing style? Does this indicate a new learning cost?

The example above is used to illustrate that while this approach may seem
simple and direct, it actually has many limitations and may produce
unexpected behavior. Will users still find it attractive? IMO *hints only
work for a very limited situation where the query is very simple, and its
scope is more coarse and not operator-level*. Maybe it deserves another
FLIP to discuss whether we need a multiple-level state TTL configuration
mechanism and how to properly implement it.

@Shammon
> Generally, Flink jobs support two types
of submission: SQL and jar. If users want to use `TTL on Operator` for SQL
jobs, they need to edit the json file which is not supported by general job
submission systems such as flink sql-client, apache kyuubi, apache
streampark and .etc. Users need to download the file and edit it manually,
but they may not have the permissions to the storage system such as HDFS in
a real production environment. From this perspective, I think it is
necessary to provide a way similar to
hits that users can configure the `TTL on Operator` in their sqls which
help users to 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shammon FY
Hi jane

Thanks for initializing this discussion. Configure TTL per operator can
help users manage state more effectively.

I think the `compiled json plan` proposal may need to consider the impact
on the user's submission workflow. Generally, Flink jobs support two types
of submission: SQL and jar. If users want to use `TTL on Operator` for SQL
jobs, they need to edit the json file which is not supported by general job
submission systems such as flink sql-client, apache kyuubi, apache
streampark and .etc. Users need to download the file and edit it manually,
but they may not have the permissions to the storage system such as HDFS in
a real production environment.

>From this perspective, I think it is necessary to provide a way similar to
hits that users can configure the `TTL on Operator` in their sqls which
help users to use it conveniently. At the same time, I agree with @Shuo's
idea that for complex cases, users can combine hits and `json plan` to
configure `TTL on Operator` better. What do you think? Thanks


Best,
Shammon FY


On Thu, Mar 23, 2023 at 9:58 PM Shuo Cheng  wrote:

> Correction: “users can set 'scan.startup.mode' for kafka connector” ->
> “users
> can set 'scan.startup.mode' for kafka connector by dynamic table option”
>
> Shuo Cheng 于2023年3月23日 周四21:50写道:
>
> > Hi Jane,
> > Thanks for driving this, operator level state ttl is absolutely a desired
> > feature. I would share my opinion as following:
> >
> > If the scope of this proposal is limited as an enhancement for compiled
> > json plan, it makes sense. I think it does not conflict with configuring
> > state ttl
> > in other ways, e.g., SQL HINT or something else, because they just work
> in
> > different level, SQL Hint works in the exact entrance of SQL API, while
> > compiled json plan is the intermediate results for SQL.
> > I think the final shape of state ttl configuring may like the that, users
> > can define operator state ttl using SQL HINT (assumption...), but it may
> > affects more than one stateful operators inside the same query block,
> then
> > users can further configure a specific one by modifying the compiled json
> > plan...
> >
> > In a word, this proposal is in good shape as an enhancement for compiled
> > json plan, and it's orthogonal with other ways like SQL Hint which works
> in
> > a higher level.
> >
> >
> > Nips:
> >
> > > "From the SQL semantic perspective, hints cannot intervene in the
> > calculation of data results."
> > I think it's more properly to say that hint does not affect the
> > equivalence of execution plans (hash agg vs sort agg), not the
> equivalence
> > of execution results, e.g., users can set 'scan.startup.mode' for kafka
> > connector, which also "intervene in the calculation of data results".
> >
> > Sincerely,
> > Shuo
> >
> > On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:
> >
> >> Hi devs,
> >>
> >> I'd like to start a discussion on FLIP-292: Support configuring state
> TTL
> >> at operator level for Table API & SQL programs [1].
> >>
> >> Currently, we only support job-level state TTL configuration via
> >> 'table.exec.state.ttl'. However, users may expect a fine-grained state
> TTL
> >> control to optimize state usage. Hence we propose to
> serialize/deserialize
> >> the state TTL as metadata of the operator's state to/from the compiled
> >> JSON
> >> plan, to achieve the goal that specifying different state TTL when
> >> transforming the exec node to stateful operators.
> >>
> >> Look forward to your opinions!
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
> >>
> >> Best Regards,
> >> Jane Chan
> >>
> >
>


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shuo Cheng
Correction: “users can set 'scan.startup.mode' for kafka connector” -> “users
can set 'scan.startup.mode' for kafka connector by dynamic table option”

Shuo Cheng 于2023年3月23日 周四21:50写道:

> Hi Jane,
> Thanks for driving this, operator level state ttl is absolutely a desired
> feature. I would share my opinion as following:
>
> If the scope of this proposal is limited as an enhancement for compiled
> json plan, it makes sense. I think it does not conflict with configuring
> state ttl
> in other ways, e.g., SQL HINT or something else, because they just work in
> different level, SQL Hint works in the exact entrance of SQL API, while
> compiled json plan is the intermediate results for SQL.
> I think the final shape of state ttl configuring may like the that, users
> can define operator state ttl using SQL HINT (assumption...), but it may
> affects more than one stateful operators inside the same query block, then
> users can further configure a specific one by modifying the compiled json
> plan...
>
> In a word, this proposal is in good shape as an enhancement for compiled
> json plan, and it's orthogonal with other ways like SQL Hint which works in
> a higher level.
>
>
> Nips:
>
> > "From the SQL semantic perspective, hints cannot intervene in the
> calculation of data results."
> I think it's more properly to say that hint does not affect the
> equivalence of execution plans (hash agg vs sort agg), not the equivalence
> of execution results, e.g., users can set 'scan.startup.mode' for kafka
> connector, which also "intervene in the calculation of data results".
>
> Sincerely,
> Shuo
>
> On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:
>
>> Hi devs,
>>
>> I'd like to start a discussion on FLIP-292: Support configuring state TTL
>> at operator level for Table API & SQL programs [1].
>>
>> Currently, we only support job-level state TTL configuration via
>> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL
>> control to optimize state usage. Hence we propose to serialize/deserialize
>> the state TTL as metadata of the operator's state to/from the compiled
>> JSON
>> plan, to achieve the goal that specifying different state TTL when
>> transforming the exec node to stateful operators.
>>
>> Look forward to your opinions!
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
>>
>> Best Regards,
>> Jane Chan
>>
>


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shuo Cheng
Hi Jane,
Thanks for driving this, operator level state ttl is absolutely a desired
feature. I would share my opinion as following:

If the scope of this proposal is limited as an enhancement for compiled
json plan, it makes sense. I think it does not conflict with configuring
state ttl
in other ways, e.g., SQL HINT or something else, because they just work in
different level, SQL Hint works in the exact entrance of SQL API, while
compiled json plan is the intermediate results for SQL.
I think the final shape of state ttl configuring may like the that, users
can define operator state ttl using SQL HINT (assumption...), but it may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

In a word, this proposal is in good shape as an enhancement for compiled
json plan, and it's orthogonal with other ways like SQL Hint which works in
a higher level.


Nips:

> "From the SQL semantic perspective, hints cannot intervene in the
calculation of data results."
I think it's more properly to say that hint does not affect the equivalence
of execution plans (hash agg vs sort agg), not the equivalence of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector, which
also "intervene in the calculation of data results".

Sincerely,
Shuo

On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:

> Hi devs,
>
> I'd like to start a discussion on FLIP-292: Support configuring state TTL
> at operator level for Table API & SQL programs [1].
>
> Currently, we only support job-level state TTL configuration via
> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL
> control to optimize state usage. Hence we propose to serialize/deserialize
> the state TTL as metadata of the operator's state to/from the compiled JSON
> plan, to achieve the goal that specifying different state TTL when
> transforming the exec node to stateful operators.
>
> Look forward to your opinions!
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
>
> Best Regards,
> Jane Chan
>