Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
pecific operators at this level. Another > option > > > is > > > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN > 'json > > > plan > > > >> as string'. Third option is to leverage a data platform to > virtualize > > > the > > > >> compiled sql plan and provide related interactions for updating ttl > > and > > > >> submit(execute) the modified compiled sql plan. > > > >> > > > >> On the other side, there is one additional benefit with this > proposal: > > > we > > > >> could fine tune SQL jobs while we migrate/upgrade them. That is > nice! > > > >> > > > >> Best regards, > > > >> Jing > > > >> > > > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle > > > >> > > > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu > wrote: > > > >> > > > >> > Thanks Jane for the proposal. > > > >> > > > > >> > TTL of state is an execution phase configuration, serialized json > > > graph > > > >> > file is the graph for execution phase, supporting the operator > level > > > >> state > > > >> > TTL in the execution json file makes sense to me. > > > >> > > > > >> > From the user's perspective, I have two concerns: > > > >> > 1. By modifying the execution graph node configuration, this > raises > > > the > > > >> > cost for users to understand, especially for SQL users. > > > >> > 2. Submitting a SQL job through `exec plan json file` is not so > > > >> intuitive > > > >> > as users cannot see the SQL detail of the job > > > >> > > > > >> > Best, > > > >> > Leonard > > > >> > > > > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang > > > >> wrote: > > > >> > > > > >> > > Hi, Jane. > > > >> > > > > > >> > > Thanks for driving this FLIP and this feature are very useful to > > > many > > > >> > > users. But I have two problems about the FLIP: > > > >> > > > > > >> > > 1. How the Gateway users use this feature? As far as I know, the > > > >> EXEUCTE > > > >> > > PLAN only supports local file right now. Is it possible to > extend > > > >> this > > > >> > > syntax to allow for reading plan files from remote file systems? > > > >> > > > > > >> > > 2. I would like to inquire if there are any limitations on this > > > >> feature? > > > >> > I > > > >> > > have encountered several instances where the data did not expire > > in > > > >> the > > > >> > > upstream operator, but it expired in the downstream operator, > > > >> resulting > > > >> > in > > > >> > > abnormal calculation results or direct exceptions thrown by the > > > >> operator > > > >> > > (e.g. rank operator). Can we limit that the expiration time of > > > >> downstream > > > >> > > operator data should be greater than or equal to the expiration > > time > > > >> of > > > >> > > upstream operator data? > > > >> > > > > > >> > > Best, > > > >> > > Shengkai > > > >> > > > > > >> > > Yun Tang 于2023年3月24日周五 14:50写道: > > > >> > > > > > >> > > > Hi, > > > >> > > > > > > >> > > > From my point of view, I am a bit against using SQL hint to > set > > > >> state > > > >> > TTL > > > >> > > > as FlinkSQL could be translated to several stateful operators. > > If > > > we > > > >> > want > > > >> > > > to let different state could have different TTL configs within > > one > > > >> > > > operator, the SQL hint solution could not work. A better way > is > > to > > > >> > allow > > > >> > > a > > > >> > > > graphical IDE to display the stateful operators and let users > > > >> configure >
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
t; > >> dealing with SQL like scripts to moving between SQL like scripts and > > file > > >> modifications back and forth. This is a big change for user > behaviours. > > >> One > > >> option could be that we upgrade/extend the COMPILE PLAN to allow users > > >> update ttl for operators at the script level. But I am not sure if it > is > > >> possible to point out specific operators at this level. Another option > > is > > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json > > plan > > >> as string'. Third option is to leverage a data platform to virtualize > > the > > >> compiled sql plan and provide related interactions for updating ttl > and > > >> submit(execute) the modified compiled sql plan. > > >> > > >> On the other side, there is one additional benefit with this proposal: > > we > > >> could fine tune SQL jobs while we migrate/upgrade them. That is nice! > > >> > > >> Best regards, > > >> Jing > > >> > > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle > > >> > > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu wrote: > > >> > > >> > Thanks Jane for the proposal. > > >> > > > >> > TTL of state is an execution phase configuration, serialized json > > graph > > >> > file is the graph for execution phase, supporting the operator level > > >> state > > >> > TTL in the execution json file makes sense to me. > > >> > > > >> > From the user's perspective, I have two concerns: > > >> > 1. By modifying the execution graph node configuration, this raises > > the > > >> > cost for users to understand, especially for SQL users. > > >> > 2. Submitting a SQL job through `exec plan json file` is not so > > >> intuitive > > >> > as users cannot see the SQL detail of the job > > >> > > > >> > Best, > > >> > Leonard > > >> > > > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang > > >> wrote: > > >> > > > >> > > Hi, Jane. > > >> > > > > >> > > Thanks for driving this FLIP and this feature are very useful to > > many > > >> > > users. But I have two problems about the FLIP: > > >> > > > > >> > > 1. How the Gateway users use this feature? As far as I know, the > > >> EXEUCTE > > >> > > PLAN only supports local file right now. Is it possible to extend > > >> this > > >> > > syntax to allow for reading plan files from remote file systems? > > >> > > > > >> > > 2. I would like to inquire if there are any limitations on this > > >> feature? > > >> > I > > >> > > have encountered several instances where the data did not expire > in > > >> the > > >> > > upstream operator, but it expired in the downstream operator, > > >> resulting > > >> > in > > >> > > abnormal calculation results or direct exceptions thrown by the > > >> operator > > >> > > (e.g. rank operator). Can we limit that the expiration time of > > >> downstream > > >> > > operator data should be greater than or equal to the expiration > time > > >> of > > >> > > upstream operator data? > > >> > > > > >> > > Best, > > >> > > Shengkai > > >> > > > > >> > > Yun Tang 于2023年3月24日周五 14:50写道: > > >> > > > > >> > > > Hi, > > >> > > > > > >> > > > From my point of view, I am a bit against using SQL hint to set > > >> state > > >> > TTL > > >> > > > as FlinkSQL could be translated to several stateful operators. > If > > we > > >> > want > > >> > > > to let different state could have different TTL configs within > one > > >> > > > operator, the SQL hint solution could not work. A better way is > to > > >> > allow > > >> > > a > > >> > > > graphical IDE to display the stateful operators and let users > > >> configure > > >> > > > them. And the IDE submits the
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
copy the ttl configs from version1 > sql > >> plan to version2 and drop version1. This means we have to keep the > >> compiled > >> json file and create a link with the original SQL script. I am not sure > if > >> I understood it correctly, it seems like a lot of maintenance effort. > >> - If I am not mistaken, the compiled sql plan introduced by FLIP-190 is > >> only used for SQL job migration/update. Common stages that Flink uses to > >> produce the execution plan from SQL does not contain the compiling step. > >> This makes one tool do two different jobs[1], upgrade + ttl tuning. > >> and tighten the dependency on compiling sql plans. Flink SQL users have > to > >> deal with a compiled sql plan for performance optimization that is not > >> designed for it. > >> - The regular working process for Flink SQL users is changed, from only > >> dealing with SQL like scripts to moving between SQL like scripts and > file > >> modifications back and forth. This is a big change for user behaviours. > >> One > >> option could be that we upgrade/extend the COMPILE PLAN to allow users > >> update ttl for operators at the script level. But I am not sure if it is > >> possible to point out specific operators at this level. Another option > is > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json > plan > >> as string'. Third option is to leverage a data platform to virtualize > the > >> compiled sql plan and provide related interactions for updating ttl and > >> submit(execute) the modified compiled sql plan. > >> > >> On the other side, there is one additional benefit with this proposal: > we > >> could fine tune SQL jobs while we migrate/upgrade them. That is nice! > >> > >> Best regards, > >> Jing > >> > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle > >> > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu wrote: > >> > >> > Thanks Jane for the proposal. > >> > > >> > TTL of state is an execution phase configuration, serialized json > graph > >> > file is the graph for execution phase, supporting the operator level > >> state > >> > TTL in the execution json file makes sense to me. > >> > > >> > From the user's perspective, I have two concerns: > >> > 1. By modifying the execution graph node configuration, this raises > the > >> > cost for users to understand, especially for SQL users. > >> > 2. Submitting a SQL job through `exec plan json file` is not so > >> intuitive > >> > as users cannot see the SQL detail of the job > >> > > >> > Best, > >> > Leonard > >> > > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang > >> wrote: > >> > > >> > > Hi, Jane. > >> > > > >> > > Thanks for driving this FLIP and this feature are very useful to > many > >> > > users. But I have two problems about the FLIP: > >> > > > >> > > 1. How the Gateway users use this feature? As far as I know, the > >> EXEUCTE > >> > > PLAN only supports local file right now. Is it possible to extend > >> this > >> > > syntax to allow for reading plan files from remote file systems? > >> > > > >> > > 2. I would like to inquire if there are any limitations on this > >> feature? > >> > I > >> > > have encountered several instances where the data did not expire in > >> the > >> > > upstream operator, but it expired in the downstream operator, > >> resulting > >> > in > >> > > abnormal calculation results or direct exceptions thrown by the > >> operator > >> > > (e.g. rank operator). Can we limit that the expiration time of > >> downstream > >> > > operator data should be greater than or equal to the expiration time > >> of > >> > > upstream operator data? > >> > > > >> > > Best, > >> > > Shengkai > >> > > > >> > > Yun Tang 于2023年3月24日周五 14:50写道: > >> > > > >> > > > Hi, > >> > > > > >> > > > From my point of view, I am a bit against using SQL hint to set > >> state > >> > TTL > >> > > > as FlinkSQL could be translated to several stateful oper
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Fri, Mar 24, 2023 at 4:02 PM Leonard Xu wrote: >> >> > Thanks Jane for the proposal. >> > >> > TTL of state is an execution phase configuration, serialized json graph >> > file is the graph for execution phase, supporting the operator level >> state >> > TTL in the execution json file makes sense to me. >> > >> > From the user's perspective, I have two concerns: >> > 1. By modifying the execution graph node configuration, this raises the >> > cost for users to understand, especially for SQL users. >> > 2. Submitting a SQL job through `exec plan json file` is not so >> intuitive >> > as users cannot see the SQL detail of the job >> > >> > Best, >> > Leonard >> > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang >> wrote: >> > >> > > Hi, Jane. >> > > >> > > Thanks for driving this FLIP and this feature are very useful to many >> > > users. But I have two problems about the FLIP: >> > > >> > > 1. How the Gateway users use this feature? As far as I know, the >> EXEUCTE >> > > PLAN only supports local file right now. Is it possible to extend >> this >> > > syntax to allow for reading plan files from remote file systems? >> > > >> > > 2. I would like to inquire if there are any limitations on this >> feature? >> > I >> > > have encountered several instances where the data did not expire in >> the >> > > upstream operator, but it expired in the downstream operator, >> resulting >> > in >> > > abnormal calculation results or direct exceptions thrown by the >> operator >> > > (e.g. rank operator). Can we limit that the expiration time of >> downstream >> > > operator data should be greater than or equal to the expiration time >> of >> > > upstream operator data? >> > > >> > > Best, >> > > Shengkai >> > > >> > > Yun Tang 于2023年3月24日周五 14:50写道: >> > > >> > > > Hi, >> > > > >> > > > From my point of view, I am a bit against using SQL hint to set >> state >> > TTL >> > > > as FlinkSQL could be translated to several stateful operators. If we >> > want >> > > > to let different state could have different TTL configs within one >> > > > operator, the SQL hint solution could not work. A better way is to >> > allow >> > > a >> > > > graphical IDE to display the stateful operators and let users >> configure >> > > > them. And the IDE submits the json plan to Flink to run jobs. >> > > > >> > > > For the details of the structure of ExecNodes, since the state name >> is >> > > > unique in the underlying state layer, shall we introduce the "index" >> > tag >> > > to >> > > > identify the state config? >> > > > What will happen with the conditions below: >> > > > 1st run: >> > > >{ >> > > > "index": 0, >> > > > "ttl": "25920 ms", >> > > > "name": "join-lef-state" >> > > >}, >> > > >{ >> > > > "index": 1, >> > > > "ttl": "8640 ms", >> > > > "name": "join-right-state" >> > > >} >> > > > >> > > > 2nd run: >> > > >{ >> > > > "index": 0, >> > > > "ttl": "8640 ms", >> > > > "name": "join-right-state" >> > > >}, >> > > >{ >> > > > "index": 1, >> > > > "ttl": "25920 ms", >> > > > "name": "join-lef-state" >> > > >} >> > > > >> > > > Best >> > > > Yun Tang >> > > > >> > > > From: Jane Chan >> > > > Sent: Friday, March 24, 2023 11:57 >> > > > To: dev@flink.apache.org >> > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at >> > > operator >> > > > level for Table API & SQL programs >> > > > >> >
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi Shuo, Thanks for your comments. > The title of this FLIP is "For Table API & SQL", but this proposal actually only covers SQL jobs via compiled json plan Please correct me if I'm wrong, but I'm thinking that the scope of this FLIP should be consistent with the FLIP-190. Here is an example to illustrate the Table API usage. tableEnv.createTemporaryTable("SourceTable", TableDescriptor .forConnector("datagen") .schema( Schema.newBuilder() .column("f0", DataTypes.STRING()).build()) .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L) .build()); Table t = tableEnv.sqlQuery("select * from SourceTable"); tableEnv.createTemporaryTable("SinkTable", TableDescriptor .forConnector("print") .schema(Schema.newBuilder() .column("f0", DataTypes.STRING()).build()) .build()); t.insertInto("SinkTable") .compilePlan() .printJsonString(); // or writeToFile As a result, I think the current title, "Support configuring state TTL at operator level for Table API & SQL programs", is self-contained. 1. The proposed approach is applicable to both SQL and Table API programs 2. The configuration granularity is at the operator level And I'm not quite sure if the FLIP title has a convention that requires the proposed approach to be specified because I see that FLIP-190 doesn't do that either. Best, Jane On Mon, Mar 27, 2023 at 1:29 PM Shuo Cheng wrote: > Hi, Jane > > Thanks for your detailed explanation. > > > Maybe it deserves another FLIP to discuss whether we need a > multiple-level state TTL configuration mechanism and how to properly > implement it. > > +1, and this is also what I want to emphasize. The title of this FLIP is > "For Table API & SQL", but this proposal actually only covers SQL jobs via > compiled json plan, SQL users who don't want or cannot use the compiled > json plan could not use the feature. If the FLIP title can state the scope > more precisely , e.g., "Enhance EXECUTE PLAN to support operator-level > state TTL configuration", it would be much more rigorous and > self-consistent, and many concerns in the mailing list may not exist. > WDYT? > > Sincerely, > Shuo > > On Fri, Mar 24, 2023 at 11:58 AM Jane Chan wrote: > > > Hi Shammon and Shuo, > > > > Thanks for your valuable comments! > > > > Some thoughts: > > > > @Shuo > > > I think it's more properly to say that hint does not affect the > > equivalenceof execution plans (hash agg vs sort agg), not the equivalence > > of execution > > results, e.g., users can set 'scan.startup.mode' for kafka connector by > > dynamic table option, which > > also "intervene in the calculation of data results". > > > > IMO, the statement that "hint should not interfere with the calculation > > results", means it should not interfere with internal computation. On the > > other hand, 'scan.startup.mode' interferes with the ingestion of the > data. > > I think these two concepts are different, but of course, this is just my > > opinion and welcome other views. > > > > > I think the final shape of state ttl configuring may like the that, > > userscan define operator state ttl using SQL HINT (assumption...), but it > > may > > affects more than one stateful operators inside the same query block, > then > > users can further configure a specific one by modifying the compiled json > > plan... > > > > Setting aside the issue of semantics, setting TTL from a higher level > seems > > to be attractive. This means that users only need to configure > > 'table.exec.state.ttl' through the existing hint syntax to achieve the > > effect. Everything is a familiar formula. But is it really the case? > Hints > > apply to a very broad range. Let me give an example. > > > > Suppose a user wants to set different TTLs for the two streams in a > stream > > join query. Where should the hints be written? > > > > -- the original query before configuring state TTL > > create temporary view view1 as select from my_table_1; > > create temporary view view2 as select from my_table_2; > > create temporary view joined_view as > > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = > > b.join_key; > > > > Option 1: declaring hints at the very beginning of the table scan > > > > -- should he or she write hints when declaring the first temporary view? > > create temporary view view1 as select from my_table_1 > > /*+(OPTIONS('table.exec.state.ttl' > > = 'foo'))*/; > > create temporary view view2 as select from my_table_2 > > /*+(OPTIONS('table.exec.state.ttl' > > = 'bar'))*/; > > create temporary view joined_view as > > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = > > b.join_key; > > > > Option 2: declaring hints when performing the join > > > > -- or should he or she write hints when declaring the join temporary > view? > > create temporary view view1 as select from my_table_1; > > create temporary view view2 as select from my_table_2; > > create
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi, Jane Thanks for your detailed explanation. > Maybe it deserves another FLIP to discuss whether we need a multiple-level state TTL configuration mechanism and how to properly implement it. +1, and this is also what I want to emphasize. The title of this FLIP is "For Table API & SQL", but this proposal actually only covers SQL jobs via compiled json plan, SQL users who don't want or cannot use the compiled json plan could not use the feature. If the FLIP title can state the scope more precisely , e.g., "Enhance EXECUTE PLAN to support operator-level state TTL configuration", it would be much more rigorous and self-consistent, and many concerns in the mailing list may not exist. WDYT? Sincerely, Shuo On Fri, Mar 24, 2023 at 11:58 AM Jane Chan wrote: > Hi Shammon and Shuo, > > Thanks for your valuable comments! > > Some thoughts: > > @Shuo > > I think it's more properly to say that hint does not affect the > equivalenceof execution plans (hash agg vs sort agg), not the equivalence > of execution > results, e.g., users can set 'scan.startup.mode' for kafka connector by > dynamic table option, which > also "intervene in the calculation of data results". > > IMO, the statement that "hint should not interfere with the calculation > results", means it should not interfere with internal computation. On the > other hand, 'scan.startup.mode' interferes with the ingestion of the data. > I think these two concepts are different, but of course, this is just my > opinion and welcome other views. > > > I think the final shape of state ttl configuring may like the that, > userscan define operator state ttl using SQL HINT (assumption...), but it > may > affects more than one stateful operators inside the same query block, then > users can further configure a specific one by modifying the compiled json > plan... > > Setting aside the issue of semantics, setting TTL from a higher level seems > to be attractive. This means that users only need to configure > 'table.exec.state.ttl' through the existing hint syntax to achieve the > effect. Everything is a familiar formula. But is it really the case? Hints > apply to a very broad range. Let me give an example. > > Suppose a user wants to set different TTLs for the two streams in a stream > join query. Where should the hints be written? > > -- the original query before configuring state TTL > create temporary view view1 as select from my_table_1; > create temporary view view2 as select from my_table_2; > create temporary view joined_view as > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = > b.join_key; > > Option 1: declaring hints at the very beginning of the table scan > > -- should he or she write hints when declaring the first temporary view? > create temporary view view1 as select from my_table_1 > /*+(OPTIONS('table.exec.state.ttl' > = 'foo'))*/; > create temporary view view2 as select from my_table_2 > /*+(OPTIONS('table.exec.state.ttl' > = 'bar'))*/; > create temporary view joined_view as > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = > b.join_key; > > Option 2: declaring hints when performing the join > > -- or should he or she write hints when declaring the join temporary view? > create temporary view view1 as select from my_table_1; > create temporary view view2 as select from my_table_2; > create temporary view joined_view as > select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' = > 'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b > on a.join_key = b.join_key; > > From the user's point of view, does he or she needs to care about the > difference between these two kinds of style? Users might think the two may > be equivalent; but in reality, as developers, how do we define the range in > which hint starts and ends to take effect? > > Consider the following two assumptions > > 1. Assuming the hint takes effect from the moment it is declared and > applies to any subsequent stateful operators until it is overridden by a > new hint. > If this is the assumption, it's clear that Option 1 and Option 2 are > different because a ChangelogNormalize node can appear between scan and > join. Meanwhile, which stream's TTL to apply to the following query after > the stream join? It is unclear if the user does not explicitly set it. > Should the engine make a random decision? > > 2. Assuming that the scope of the hint only applies to the current query > block and does not extend to the next operator. > In this case, the first way of setting the hint will not work because it > cannot be brought to the join operator. Users must choose the second way to > configure. Are users willing to remember this strange constraint on SQL > writing style? Does this indicate a new learning cost? > > The example above is used to illustrate that while this approach may seem > simple and direct, it actually has many limitations and may produce > unexpected
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
le right now. Is it possible to extend >> this >> > > > syntax to allow for reading plan files from remote file systems? >> > > > >> > > > 2. I would like to inquire if there are any limitations on this >> > feature? >> > > I >> > > > have encountered several instances where the data did not expire in >> the >> > > > upstream operator, but it expired in the downstream operator, >> resulting >> > > in >> > > > abnormal calculation results or direct exceptions thrown by the >> > operator >> > > > (e.g. rank operator). Can we limit that the expiration time of >> > downstream >> > > > operator data should be greater than or equal to the expiration >> time of >> > > > upstream operator data? >> > > > >> > > > Best, >> > > > Shengkai >> > > > >> > > > Yun Tang 于2023年3月24日周五 14:50写道: >> > > > >> > > > > Hi, >> > > > > >> > > > > From my point of view, I am a bit against using SQL hint to set >> state >> > > TTL >> > > > > as FlinkSQL could be translated to several stateful operators. If >> we >> > > want >> > > > > to let different state could have different TTL configs within one >> > > > > operator, the SQL hint solution could not work. A better way is to >> > > allow >> > > > a >> > > > > graphical IDE to display the stateful operators and let users >> > configure >> > > > > them. And the IDE submits the json plan to Flink to run jobs. >> > > > > >> > > > > For the details of the structure of ExecNodes, since the state >> name >> > is >> > > > > unique in the underlying state layer, shall we introduce the >> "index" >> > > tag >> > > > to >> > > > > identify the state config? >> > > > > What will happen with the conditions below: >> > > > > 1st run: >> > > > >{ >> > > > > "index": 0, >> > > > > "ttl": "25920 ms", >> > > > > "name": "join-lef-state" >> > > > >}, >> > > > >{ >> > > > > "index": 1, >> > > > > "ttl": "8640 ms", >> > > > > "name": "join-right-state" >> > > > >} >> > > > > >> > > > > 2nd run: >> > > > >{ >> > > > > "index": 0, >> > > > > "ttl": "8640 ms", >> > > > > "name": "join-right-state" >> > > > >}, >> > > > >{ >> > > > > "index": 1, >> > > > > "ttl": "25920 ms", >> > > > > "name": "join-lef-state" >> > > > >} >> > > > > >> > > > > Best >> > > > > Yun Tang >> > > > > >> > > > > From: Jane Chan >> > > > > Sent: Friday, March 24, 2023 11:57 >> > > > > To: dev@flink.apache.org >> > > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at >> > > > operator >> > > > > level for Table API & SQL programs >> > > > > >> > > > > Hi Shammon and Shuo, >> > > > > >> > > > > Thanks for your valuable comments! >> > > > > >> > > > > Some thoughts: >> > > > > >> > > > > @Shuo >> > > > > > I think it's more properly to say that hint does not affect the >> > > > > equivalenceof execution plans (hash agg vs sort agg), not the >> > > equivalence >> > > > > of execution >> > > > > results, e.g., users can set 'scan.startup.mode' for kafka >> connector >> > by >> > > > > dynamic table option, which >> > > > > also "intervene in the calculation of data results". >> > > > > >> > > > > IMO, the statement that &qu
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
un jobs. > > > > > > > > > > For the details of the structure of ExecNodes, since the state name > > is > > > > > unique in the underlying state layer, shall we introduce the > "index" > > > tag > > > > to > > > > > identify the state config? > > > > > What will happen with the conditions below: > > > > > 1st run: > > > > >{ > > > > > "index": 0, > > > > > "ttl": "25920 ms", > > > > > "name": "join-lef-state" > > > > >}, > > > > >{ > > > > > "index": 1, > > > > > "ttl": "8640 ms", > > > > > "name": "join-right-state" > > > > >} > > > > > > > > > > 2nd run: > > > > >{ > > > > > "index": 0, > > > > > "ttl": "8640 ms", > > > > > "name": "join-right-state" > > > > >}, > > > > >{ > > > > > "index": 1, > > > > > "ttl": "25920 ms", > > > > > "name": "join-lef-state" > > > > >} > > > > > > > > > > Best > > > > > Yun Tang > > > > > > > > > > From: Jane Chan > > > > > Sent: Friday, March 24, 2023 11:57 > > > > > To: dev@flink.apache.org > > > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at > > > > operator > > > > > level for Table API & SQL programs > > > > > > > > > > Hi Shammon and Shuo, > > > > > > > > > > Thanks for your valuable comments! > > > > > > > > > > Some thoughts: > > > > > > > > > > @Shuo > > > > > > I think it's more properly to say that hint does not affect the > > > > > equivalenceof execution plans (hash agg vs sort agg), not the > > > equivalence > > > > > of execution > > > > > results, e.g., users can set 'scan.startup.mode' for kafka > connector > > by > > > > > dynamic table option, which > > > > > also "intervene in the calculation of data results". > > > > > > > > > > IMO, the statement that "hint should not interfere with the > > calculation > > > > > results", means it should not interfere with internal computation. > On > > > the > > > > > other hand, 'scan.startup.mode' interferes with the ingestion of > the > > > > data. > > > > > I think these two concepts are different, but of course, this is > just > > > my > > > > > opinion and welcome other views. > > > > > > > > > > > I think the final shape of state ttl configuring may like the > that, > > > > > userscan define operator state ttl using SQL HINT (assumption...), > > but > > > it > > > > > may > > > > > affects more than one stateful operators inside the same query > block, > > > > then > > > > > users can further configure a specific one by modifying the > compiled > > > json > > > > > plan... > > > > > > > > > > Setting aside the issue of semantics, setting TTL from a higher > level > > > > seems > > > > > to be attractive. This means that users only need to configure > > > > > 'table.exec.state.ttl' through the existing hint syntax to achieve > > the > > > > > effect. Everything is a familiar formula. But is it really the > case? > > > > Hints > > > > > apply to a very broad range. Let me give an example. > > > > > > > > > > Suppose a user wants to set different TTLs for the two streams in a > > > > stream > > > > > join query. Where should the hints be written? > > > > > > > > > > -- the original query before configuring state TTL > > > > > create temporary view view1 as select from my_table_1; > > > > > create temporary view view2 as select from my_table_2; > > > > > cre
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
migration/update. Common stages that Flink uses to > produce the execution plan from SQL does not contain the compiling step. > This makes one tool do two different jobs[1], upgrade + ttl tuning. > and tighten the dependency on compiling sql plans. Flink SQL users have to > deal with a compiled sql plan for performance optimization that is not > designed for it. > - The regular working process for Flink SQL users is changed, from only > dealing with SQL like scripts to moving between SQL like scripts and file > modifications back and forth. This is a big change for user behaviours. One > option could be that we upgrade/extend the COMPILE PLAN to allow users > update ttl for operators at the script level. But I am not sure if it is > possible to point out specific operators at this level. Another option is > to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json plan > as string'. Third option is to leverage a data platform to virtualize the > compiled sql plan and provide related interactions for updating ttl and > submit(execute) the modified compiled sql plan. > > On the other side, there is one additional benefit with this proposal: we > could fine tune SQL jobs while we migrate/upgrade them. That is nice! > > Best regards, > Jing > > [1] https://en.wikipedia.org/wiki/Single-responsibility_principle > > On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu wrote: > > > Thanks Jane for the proposal. > > > > TTL of state is an execution phase configuration, serialized json graph > > file is the graph for execution phase, supporting the operator level > state > > TTL in the execution json file makes sense to me. > > > > From the user's perspective, I have two concerns: > > 1. By modifying the execution graph node configuration, this raises the > > cost for users to understand, especially for SQL users. > > 2. Submitting a SQL job through `exec plan json file` is not so intuitive > > as users cannot see the SQL detail of the job > > > > Best, > > Leonard > > > > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang wrote: > > > > > Hi, Jane. > > > > > > Thanks for driving this FLIP and this feature are very useful to many > > > users. But I have two problems about the FLIP: > > > > > > 1. How the Gateway users use this feature? As far as I know, the > EXEUCTE > > > PLAN only supports local file right now. Is it possible to extend this > > > syntax to allow for reading plan files from remote file systems? > > > > > > 2. I would like to inquire if there are any limitations on this > feature? > > I > > > have encountered several instances where the data did not expire in the > > > upstream operator, but it expired in the downstream operator, resulting > > in > > > abnormal calculation results or direct exceptions thrown by the > operator > > > (e.g. rank operator). Can we limit that the expiration time of > downstream > > > operator data should be greater than or equal to the expiration time of > > > upstream operator data? > > > > > > Best, > > > Shengkai > > > > > > Yun Tang 于2023年3月24日周五 14:50写道: > > > > > > > Hi, > > > > > > > > From my point of view, I am a bit against using SQL hint to set state > > TTL > > > > as FlinkSQL could be translated to several stateful operators. If we > > want > > > > to let different state could have different TTL configs within one > > > > operator, the SQL hint solution could not work. A better way is to > > allow > > > a > > > > graphical IDE to display the stateful operators and let users > configure > > > > them. And the IDE submits the json plan to Flink to run jobs. > > > > > > > > For the details of the structure of ExecNodes, since the state name > is > > > > unique in the underlying state layer, shall we introduce the "index" > > tag > > > to > > > > identify the state config? > > > > What will happen with the conditions below: > > > > 1st run: > > > >{ > > > > "index": 0, > > > > "ttl": "25920 ms", > > > > "name": "join-lef-state" > > > >}, > > > >{ > > > > "index": 1, > > > > "ttl": "8640 ms", > > > > "name": "join-right-state" > > > >} > > > > > > > > 2n
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
But I have two problems about the FLIP: > > > > > > 1. How the Gateway users use this feature? As far as I know, the > EXEUCTE > > > PLAN only supports local file right now. Is it possible to extend this > > > syntax to allow for reading plan files from remote file systems? > > > > > > 2. I would like to inquire if there are any limitations on this > feature? > > I > > > have encountered several instances where the data did not expire in the > > > upstream operator, but it expired in the downstream operator, resulting > > in > > > abnormal calculation results or direct exceptions thrown by the > operator > > > (e.g. rank operator). Can we limit that the expiration time of > downstream > > > operator data should be greater than or equal to the expiration time of > > > upstream operator data? > > > > > > Best, > > > Shengkai > > > > > > Yun Tang 于2023年3月24日周五 14:50写道: > > > > > > > Hi, > > > > > > > > From my point of view, I am a bit against using SQL hint to set state > > TTL > > > > as FlinkSQL could be translated to several stateful operators. If we > > want > > > > to let different state could have different TTL configs within one > > > > operator, the SQL hint solution could not work. A better way is to > > allow > > > a > > > > graphical IDE to display the stateful operators and let users > configure > > > > them. And the IDE submits the json plan to Flink to run jobs. > > > > > > > > For the details of the structure of ExecNodes, since the state name > is > > > > unique in the underlying state layer, shall we introduce the "index" > > tag > > > to > > > > identify the state config? > > > > What will happen with the conditions below: > > > > 1st run: > > > > { > > > > "index": 0, > > > > "ttl": "25920 ms", > > > > "name": "join-lef-state" > > > >}, > > > >{ > > > > "index": 1, > > > > "ttl": "8640 ms", > > > > "name": "join-right-state" > > > >} > > > > > > > > 2nd run: > > > >{ > > > > "index": 0, > > > > "ttl": "8640 ms", > > > > "name": "join-right-state" > > > >}, > > > >{ > > > > "index": 1, > > > > "ttl": "25920 ms", > > > > "name": "join-lef-state" > > > >} > > > > > > > > Best > > > > Yun Tang > > > > > > > > From: Jane Chan > > > > Sent: Friday, March 24, 2023 11:57 > > > > To: dev@flink.apache.org > > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at > > > operator > > > > level for Table API & SQL programs > > > > > > > > Hi Shammon and Shuo, > > > > > > > > Thanks for your valuable comments! > > > > > > > > Some thoughts: > > > > > > > > @Shuo > > > > > I think it's more properly to say that hint does not affect the > > > > equivalenceof execution plans (hash agg vs sort agg), not the > > equivalence > > > > of execution > > > > results, e.g., users can set 'scan.startup.mode' for kafka connector > by > > > > dynamic table option, which > > > > also "intervene in the calculation of data results". > > > > > > > > IMO, the statement that "hint should not interfere with the > calculation > > > > results", means it should not interfere with internal computation. On > > the > > > > other hand, 'scan.startup.mode' interferes with the ingestion of the > > > data. > > > > I think these two concepts are different, but of course, this is just > > my > > > > opinion and welcome other views. > > > > > > > > > I think the final shape of state ttl configuring may like the that, > > > > userscan define operator state ttl using SQL HINT (assumption...), > but > > it > > > > may > > > > affects more
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
t; > tag > > to > > > identify the state config? > > > What will happen with the conditions below: > > > 1st run: > > >{ > > > "index": 0, > > > "ttl": "25920 ms", > > > "name": "join-lef-state" > > >}, > > >{ > > > "index": 1, > > > "ttl": "86400000 ms", > > > "name": "join-right-state" > > > } > > > > > > 2nd run: > > >{ > > > "index": 0, > > > "ttl": "8640 ms", > > > "name": "join-right-state" > > >}, > > >{ > > > "index": 1, > > > "ttl": "25920 ms", > > > "name": "join-lef-state" > > >} > > > > > > Best > > > Yun Tang > > > > > > From: Jane Chan > > > Sent: Friday, March 24, 2023 11:57 > > > To: dev@flink.apache.org > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at > > operator > > > level for Table API & SQL programs > > > > > > Hi Shammon and Shuo, > > > > > > Thanks for your valuable comments! > > > > > > Some thoughts: > > > > > > @Shuo > > > > I think it's more properly to say that hint does not affect the > > > equivalenceof execution plans (hash agg vs sort agg), not the > equivalence > > > of execution > > > results, e.g., users can set 'scan.startup.mode' for kafka connector by > > > dynamic table option, which > > > also "intervene in the calculation of data results". > > > > > > IMO, the statement that "hint should not interfere with the calculation > > > results", means it should not interfere with internal computation. On > the > > > other hand, 'scan.startup.mode' interferes with the ingestion of the > > data. > > > I think these two concepts are different, but of course, this is just > my > > > opinion and welcome other views. > > > > > > > I think the final shape of state ttl configuring may like the that, > > > userscan define operator state ttl using SQL HINT (assumption...), but > it > > > may > > > affects more than one stateful operators inside the same query block, > > then > > > users can further configure a specific one by modifying the compiled > json > > > plan... > > > > > > Setting aside the issue of semantics, setting TTL from a higher level > > seems > > > to be attractive. This means that users only need to configure > > > 'table.exec.state.ttl' through the existing hint syntax to achieve the > > > effect. Everything is a familiar formula. But is it really the case? > > Hints > > > apply to a very broad range. Let me give an example. > > > > > > Suppose a user wants to set different TTLs for the two streams in a > > stream > > > join query. Where should the hints be written? > > > > > > -- the original query before configuring state TTL > > > create temporary view view1 as select from my_table_1; > > > create temporary view view2 as select from my_table_2; > > > create temporary view joined_view as > > > select view1.*, view2.* from my_view_1 a join my_view_2 b on > a.join_key = > > > b.join_key; > > > > > > Option 1: declaring hints at the very beginning of the table scan > > > > > > -- should he or she write hints when declaring the first temporary > view? > > > create temporary view view1 as select from my_table_1 > > > /*+(OPTIONS('table.exec.state.ttl' > > > = 'foo'))*/; > > > create temporary view view2 as select from my_table_2 > > > /*+(OPTIONS('table.exec.state.ttl' > > > = 'bar'))*/; > > > create temporary view joined_view as > > > select view1.*, view2.* from my_view_1 a join my_view_2 b on > a.join_key = > > > b.join_key; > > > > > > Option 2: declaring hints when performing the join > > > > > > -- or should he or she write hints when declaring the join temporary > > view? > > > create temporary view view1 as select from my_table_1; > > > create temporary view view2 as select from my_table_2; > > > create temporary view joi
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Thanks Jane for the proposal. TTL of state is an execution phase configuration, serialized json graph file is the graph for execution phase, supporting the operator level state TTL in the execution json file makes sense to me. >From the user's perspective, I have two concerns: 1. By modifying the execution graph node configuration, this raises the cost for users to understand, especially for SQL users. 2. Submitting a SQL job through `exec plan json file` is not so intuitive as users cannot see the SQL detail of the job Best, Leonard On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang wrote: > Hi, Jane. > > Thanks for driving this FLIP and this feature are very useful to many > users. But I have two problems about the FLIP: > > 1. How the Gateway users use this feature? As far as I know, the EXEUCTE > PLAN only supports local file right now. Is it possible to extend this > syntax to allow for reading plan files from remote file systems? > > 2. I would like to inquire if there are any limitations on this feature? I > have encountered several instances where the data did not expire in the > upstream operator, but it expired in the downstream operator, resulting in > abnormal calculation results or direct exceptions thrown by the operator > (e.g. rank operator). Can we limit that the expiration time of downstream > operator data should be greater than or equal to the expiration time of > upstream operator data? > > Best, > Shengkai > > Yun Tang 于2023年3月24日周五 14:50写道: > > > Hi, > > > > From my point of view, I am a bit against using SQL hint to set state TTL > > as FlinkSQL could be translated to several stateful operators. If we want > > to let different state could have different TTL configs within one > > operator, the SQL hint solution could not work. A better way is to allow > a > > graphical IDE to display the stateful operators and let users configure > > them. And the IDE submits the json plan to Flink to run jobs. > > > > For the details of the structure of ExecNodes, since the state name is > > unique in the underlying state layer, shall we introduce the "index" tag > to > > identify the state config? > > What will happen with the conditions below: > > 1st run: > >{ > > "index": 0, > > "ttl": "25920 ms", > > "name": "join-lef-state" > >}, > >{ > > "index": 1, > > "ttl": "8640 ms", > > "name": "join-right-state" > >} > > > > 2nd run: > >{ > > "index": 0, > > "ttl": "86400000 ms", > > "name": "join-right-state" > >}, > >{ > > "index": 1, > > "ttl": "25920 ms", > > "name": "join-lef-state" > >} > > > > Best > > Yun Tang > > > > From: Jane Chan > > Sent: Friday, March 24, 2023 11:57 > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at > operator > > level for Table API & SQL programs > > > > Hi Shammon and Shuo, > > > > Thanks for your valuable comments! > > > > Some thoughts: > > > > @Shuo > > > I think it's more properly to say that hint does not affect the > > equivalenceof execution plans (hash agg vs sort agg), not the equivalence > > of execution > > results, e.g., users can set 'scan.startup.mode' for kafka connector by > > dynamic table option, which > > also "intervene in the calculation of data results". > > > > IMO, the statement that "hint should not interfere with the calculation > > results", means it should not interfere with internal computation. On the > > other hand, 'scan.startup.mode' interferes with the ingestion of the > data. > > I think these two concepts are different, but of course, this is just my > > opinion and welcome other views. > > > > > I think the final shape of state ttl configuring may like the that, > > userscan define operator state ttl using SQL HINT (assumption...), but it > > may > > affects more than one stateful operators inside the same query block, > then > > users can further configure a specific one by modifying the compiled json > > plan... > > > > Setting aside the issue of semantics, setting TTL from a higher level > seems > > to be attractive. This means that users only need to configure > > 't
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi, Jane. Thanks for driving this FLIP and this feature are very useful to many users. But I have two problems about the FLIP: 1. How the Gateway users use this feature? As far as I know, the EXEUCTE PLAN only supports local file right now. Is it possible to extend this syntax to allow for reading plan files from remote file systems? 2. I would like to inquire if there are any limitations on this feature? I have encountered several instances where the data did not expire in the upstream operator, but it expired in the downstream operator, resulting in abnormal calculation results or direct exceptions thrown by the operator (e.g. rank operator). Can we limit that the expiration time of downstream operator data should be greater than or equal to the expiration time of upstream operator data? Best, Shengkai Yun Tang 于2023年3月24日周五 14:50写道: > Hi, > > From my point of view, I am a bit against using SQL hint to set state TTL > as FlinkSQL could be translated to several stateful operators. If we want > to let different state could have different TTL configs within one > operator, the SQL hint solution could not work. A better way is to allow a > graphical IDE to display the stateful operators and let users configure > them. And the IDE submits the json plan to Flink to run jobs. > > For the details of the structure of ExecNodes, since the state name is > unique in the underlying state layer, shall we introduce the "index" tag to > identify the state config? > What will happen with the conditions below: > 1st run: >{ > "index": 0, > "ttl": "25920 ms", > "name": "join-lef-state" >}, >{ > "index": 1, > "ttl": "8640 ms", > "name": "join-right-state" >} > > 2nd run: >{ > "index": 0, > "ttl": "8640 ms", > "name": "join-right-state" > }, > { > "index": 1, > "ttl": "259200000 ms", > "name": "join-lef-state" >} > > Best > Yun Tang > > From: Jane Chan > Sent: Friday, March 24, 2023 11:57 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator > level for Table API & SQL programs > > Hi Shammon and Shuo, > > Thanks for your valuable comments! > > Some thoughts: > > @Shuo > > I think it's more properly to say that hint does not affect the > equivalenceof execution plans (hash agg vs sort agg), not the equivalence > of execution > results, e.g., users can set 'scan.startup.mode' for kafka connector by > dynamic table option, which > also "intervene in the calculation of data results". > > IMO, the statement that "hint should not interfere with the calculation > results", means it should not interfere with internal computation. On the > other hand, 'scan.startup.mode' interferes with the ingestion of the data. > I think these two concepts are different, but of course, this is just my > opinion and welcome other views. > > > I think the final shape of state ttl configuring may like the that, > userscan define operator state ttl using SQL HINT (assumption...), but it > may > affects more than one stateful operators inside the same query block, then > users can further configure a specific one by modifying the compiled json > plan... > > Setting aside the issue of semantics, setting TTL from a higher level seems > to be attractive. This means that users only need to configure > 'table.exec.state.ttl' through the existing hint syntax to achieve the > effect. Everything is a familiar formula. But is it really the case? Hints > apply to a very broad range. Let me give an example. > > Suppose a user wants to set different TTLs for the two streams in a stream > join query. Where should the hints be written? > > -- the original query before configuring state TTL > create temporary view view1 as select from my_table_1; > create temporary view view2 as select from my_table_2; > create temporary view joined_view as > select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = > b.join_key; > > Option 1: declaring hints at the very beginning of the table scan > > -- should he or she write hints when declaring the first temporary view? > create temporary view view1 as select from my_table_1 > /*+(OPTIONS('table.exec.state.ttl' > = 'foo'))*/; > create temporary view view2 as select from my_table_2 > /*+(OPTIONS('table.exec.state.ttl' > = 'bar'))*/; > create temporary view joined_view as >
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi Yun, Thanks for your interest in this FLIP. > A better way is to allow a graphical IDE to display the stateful operators and let users configure them. And the IDE submits the json plan to Flink to run jobs. This is a very useful suggestion. Actually, during the design phase, we considered the complexity of users manually modifying JSON. At that time, we discussed whether to develop an additional tool to help users modify JSON more easily, but nothing best than a graphical interface to modify the TTL of each operator. It's a pity that https://flink.apache.org/visualizer/ has been taken offline, otherwise it would have been a great addition to the experience. > since the state name is unique in the underlying state layer, shall we introduce the "index" tag to identify the state config? I'd like to explain why we are considering adding both the index (which annotates the stateful operator's input order) and the name to the metadata of the state. Giving a meaningful naming like "join-left-state" or "deduplicate-state" is trying to help users understand as much as possible what each state does. E.g. suppose we have a graphical IDE to let users configure, then we can display the state's name instead of the index to users. While on the other hand, for a stateful MultipleInputStreamOperator, it's difficult to distinguish which input stream the current state belongs to simply from the name. Back to the case, the 2nd run will not happen. This is because according to the current stream join implementation, the first input of the AbstractStreamingJoinOperator (you can reach to StreamingJoinOperator.java#L123 <https://github.com/apache/flink/blob/60f3b2b32b593bf74a90015765817b3bf9c95d54/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L123>) will always be the left side and the second input will always be the right side, regardless of whether it is a left outer join or right outer join. Let me give a detailed example. You can tell from the plan that the first_table is always the first input of the operator (which is nearest to the join node). Flink SQL> show create table first_table; CREATE TABLE `default_catalog`.`default_database`.`first_table` ( `f0` INT, `f1` VARCHAR(2147483647) ) WITH ( 'connector' = 'datagen' ) Flink SQL> show create table second_table; CREATE TABLE `default_catalog`.`default_database`.`second_table` ( `f0` INT, `f1` DOUBLE ) WITH ( 'connector' = 'datagen' ) Flink SQL> explain select a.*, b.* from first_table a left join second_table b on a.f0 = b.f0; == Optimized Physical Plan == Join(joinType=[LeftOuterJoin], where=[=(f0, f00)], select=[f0, f1, f00, f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[f0]]) : +- TableSourceScan(table=[[default_catalog, default_database, first_table]], fields=[f0, f1]) +- Exchange(distribution=[hash[f0]]) +- TableSourceScan(table=[[default_catalog, default_database, second_table]], fields=[f0, f1]) Flink SQL> explain select a.*, b.* from first_table a right join second_table b on a.f0 = b.f0; == Optimized Physical Plan == Join(joinType=[RightOuterJoin], where=[=(f0, f00)], select=[f0, f1, f00, f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[f0]]) : +- TableSourceScan(table=[[default_catalog, default_database, first_table]], fields=[f0, f1]) +- Exchange(distribution=[hash[f0]]) +- TableSourceScan(table=[[default_catalog, default_database, second_table]], fields=[f0, f1]) Best, Jane On Fri, Mar 24, 2023 at 2:51 PM Yun Tang wrote: > Hi, > > From my point of view, I am a bit against using SQL hint to set state TTL > as FlinkSQL could be translated to several stateful operators. If we want > to let different state could have different TTL configs within one > operator, the SQL hint solution could not work. A better way is to allow a > graphical IDE to display the stateful operators and let users configure > them. And the IDE submits the json plan to Flink to run jobs. > > For the details of the structure of ExecNodes, since the state name is > unique in the underlying state layer, shall we introduce the "index" tag to > identify the state config? > What will happen with the conditions below: > 1st run: >{ > "index": 0, > "ttl": "25920 ms", > "name": "join-lef-state" >}, >{ > "index": 1, > "ttl": "8640 ms", > "name": "join-right-state" >} > > 2nd run: >{ > "index": 0, > "ttl": "8640 ms", > "name": "join-right-state" > }, > { > "index": 1, > "ttl&qu
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi, From my point of view, I am a bit against using SQL hint to set state TTL as FlinkSQL could be translated to several stateful operators. If we want to let different state could have different TTL configs within one operator, the SQL hint solution could not work. A better way is to allow a graphical IDE to display the stateful operators and let users configure them. And the IDE submits the json plan to Flink to run jobs. For the details of the structure of ExecNodes, since the state name is unique in the underlying state layer, shall we introduce the "index" tag to identify the state config? What will happen with the conditions below: 1st run: { "index": 0, "ttl": "25920 ms", "name": "join-lef-state" }, { "index": 1, "ttl": "8640 ms", "name": "join-right-state" } 2nd run: { "index": 0, "ttl": "8640 ms", "name": "join-right-state" }, { "index": 1, "ttl": "25920 ms", "name": "join-lef-state" } Best Yun Tang ____ From: Jane Chan Sent: Friday, March 24, 2023 11:57 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs Hi Shammon and Shuo, Thanks for your valuable comments! Some thoughts: @Shuo > I think it's more properly to say that hint does not affect the equivalenceof execution plans (hash agg vs sort agg), not the equivalence of execution results, e.g., users can set 'scan.startup.mode' for kafka connector by dynamic table option, which also "intervene in the calculation of data results". IMO, the statement that "hint should not interfere with the calculation results", means it should not interfere with internal computation. On the other hand, 'scan.startup.mode' interferes with the ingestion of the data. I think these two concepts are different, but of course, this is just my opinion and welcome other views. > I think the final shape of state ttl configuring may like the that, userscan define operator state ttl using SQL HINT (assumption...), but it may affects more than one stateful operators inside the same query block, then users can further configure a specific one by modifying the compiled json plan... Setting aside the issue of semantics, setting TTL from a higher level seems to be attractive. This means that users only need to configure 'table.exec.state.ttl' through the existing hint syntax to achieve the effect. Everything is a familiar formula. But is it really the case? Hints apply to a very broad range. Let me give an example. Suppose a user wants to set different TTLs for the two streams in a stream join query. Where should the hints be written? -- the original query before configuring state TTL create temporary view view1 as select from my_table_1; create temporary view view2 as select from my_table_2; create temporary view joined_view as select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = b.join_key; Option 1: declaring hints at the very beginning of the table scan -- should he or she write hints when declaring the first temporary view? create temporary view view1 as select from my_table_1 /*+(OPTIONS('table.exec.state.ttl' = 'foo'))*/; create temporary view view2 as select from my_table_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/; create temporary view joined_view as select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = b.join_key; Option 2: declaring hints when performing the join -- or should he or she write hints when declaring the join temporary view? create temporary view view1 as select from my_table_1; create temporary view view2 as select from my_table_2; create temporary view joined_view as select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' = 'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b on a.join_key = b.join_key; From the user's point of view, does he or she needs to care about the difference between these two kinds of style? Users might think the two may be equivalent; but in reality, as developers, how do we define the range in which hint starts and ends to take effect? Consider the following two assumptions 1. Assuming the hint takes effect from the moment it is declared and applies to any subsequent stateful operators until it is overridden by a new hint. If this is the assumption, it's clear that Option 1 and Option 2 are different because a ChangelogNormalize node can appear between scan and join. Meanwhile, which stream's TTL to apply to the following query after the stream join? It is unclear if the user does not explicitly set it. Should the engine make a random dec
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi Shammon and Shuo, Thanks for your valuable comments! Some thoughts: @Shuo > I think it's more properly to say that hint does not affect the equivalenceof execution plans (hash agg vs sort agg), not the equivalence of execution results, e.g., users can set 'scan.startup.mode' for kafka connector by dynamic table option, which also "intervene in the calculation of data results". IMO, the statement that "hint should not interfere with the calculation results", means it should not interfere with internal computation. On the other hand, 'scan.startup.mode' interferes with the ingestion of the data. I think these two concepts are different, but of course, this is just my opinion and welcome other views. > I think the final shape of state ttl configuring may like the that, userscan define operator state ttl using SQL HINT (assumption...), but it may affects more than one stateful operators inside the same query block, then users can further configure a specific one by modifying the compiled json plan... Setting aside the issue of semantics, setting TTL from a higher level seems to be attractive. This means that users only need to configure 'table.exec.state.ttl' through the existing hint syntax to achieve the effect. Everything is a familiar formula. But is it really the case? Hints apply to a very broad range. Let me give an example. Suppose a user wants to set different TTLs for the two streams in a stream join query. Where should the hints be written? -- the original query before configuring state TTL create temporary view view1 as select from my_table_1; create temporary view view2 as select from my_table_2; create temporary view joined_view as select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = b.join_key; Option 1: declaring hints at the very beginning of the table scan -- should he or she write hints when declaring the first temporary view? create temporary view view1 as select from my_table_1 /*+(OPTIONS('table.exec.state.ttl' = 'foo'))*/; create temporary view view2 as select from my_table_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/; create temporary view joined_view as select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key = b.join_key; Option 2: declaring hints when performing the join -- or should he or she write hints when declaring the join temporary view? create temporary view view1 as select from my_table_1; create temporary view view2 as select from my_table_2; create temporary view joined_view as select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' = 'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b on a.join_key = b.join_key; >From the user's point of view, does he or she needs to care about the difference between these two kinds of style? Users might think the two may be equivalent; but in reality, as developers, how do we define the range in which hint starts and ends to take effect? Consider the following two assumptions 1. Assuming the hint takes effect from the moment it is declared and applies to any subsequent stateful operators until it is overridden by a new hint. If this is the assumption, it's clear that Option 1 and Option 2 are different because a ChangelogNormalize node can appear between scan and join. Meanwhile, which stream's TTL to apply to the following query after the stream join? It is unclear if the user does not explicitly set it. Should the engine make a random decision? 2. Assuming that the scope of the hint only applies to the current query block and does not extend to the next operator. In this case, the first way of setting the hint will not work because it cannot be brought to the join operator. Users must choose the second way to configure. Are users willing to remember this strange constraint on SQL writing style? Does this indicate a new learning cost? The example above is used to illustrate that while this approach may seem simple and direct, it actually has many limitations and may produce unexpected behavior. Will users still find it attractive? IMO *hints only work for a very limited situation where the query is very simple, and its scope is more coarse and not operator-level*. Maybe it deserves another FLIP to discuss whether we need a multiple-level state TTL configuration mechanism and how to properly implement it. @Shammon > Generally, Flink jobs support two types of submission: SQL and jar. If users want to use `TTL on Operator` for SQL jobs, they need to edit the json file which is not supported by general job submission systems such as flink sql-client, apache kyuubi, apache streampark and .etc. Users need to download the file and edit it manually, but they may not have the permissions to the storage system such as HDFS in a real production environment. From this perspective, I think it is necessary to provide a way similar to hits that users can configure the `TTL on Operator` in their sqls which help users to
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi jane Thanks for initializing this discussion. Configure TTL per operator can help users manage state more effectively. I think the `compiled json plan` proposal may need to consider the impact on the user's submission workflow. Generally, Flink jobs support two types of submission: SQL and jar. If users want to use `TTL on Operator` for SQL jobs, they need to edit the json file which is not supported by general job submission systems such as flink sql-client, apache kyuubi, apache streampark and .etc. Users need to download the file and edit it manually, but they may not have the permissions to the storage system such as HDFS in a real production environment. >From this perspective, I think it is necessary to provide a way similar to hits that users can configure the `TTL on Operator` in their sqls which help users to use it conveniently. At the same time, I agree with @Shuo's idea that for complex cases, users can combine hits and `json plan` to configure `TTL on Operator` better. What do you think? Thanks Best, Shammon FY On Thu, Mar 23, 2023 at 9:58 PM Shuo Cheng wrote: > Correction: “users can set 'scan.startup.mode' for kafka connector” -> > “users > can set 'scan.startup.mode' for kafka connector by dynamic table option” > > Shuo Cheng 于2023年3月23日 周四21:50写道: > > > Hi Jane, > > Thanks for driving this, operator level state ttl is absolutely a desired > > feature. I would share my opinion as following: > > > > If the scope of this proposal is limited as an enhancement for compiled > > json plan, it makes sense. I think it does not conflict with configuring > > state ttl > > in other ways, e.g., SQL HINT or something else, because they just work > in > > different level, SQL Hint works in the exact entrance of SQL API, while > > compiled json plan is the intermediate results for SQL. > > I think the final shape of state ttl configuring may like the that, users > > can define operator state ttl using SQL HINT (assumption...), but it may > > affects more than one stateful operators inside the same query block, > then > > users can further configure a specific one by modifying the compiled json > > plan... > > > > In a word, this proposal is in good shape as an enhancement for compiled > > json plan, and it's orthogonal with other ways like SQL Hint which works > in > > a higher level. > > > > > > Nips: > > > > > "From the SQL semantic perspective, hints cannot intervene in the > > calculation of data results." > > I think it's more properly to say that hint does not affect the > > equivalence of execution plans (hash agg vs sort agg), not the > equivalence > > of execution results, e.g., users can set 'scan.startup.mode' for kafka > > connector, which also "intervene in the calculation of data results". > > > > Sincerely, > > Shuo > > > > On Tue, Mar 21, 2023 at 7:52 PM Jane Chan wrote: > > > >> Hi devs, > >> > >> I'd like to start a discussion on FLIP-292: Support configuring state > TTL > >> at operator level for Table API & SQL programs [1]. > >> > >> Currently, we only support job-level state TTL configuration via > >> 'table.exec.state.ttl'. However, users may expect a fine-grained state > TTL > >> control to optimize state usage. Hence we propose to > serialize/deserialize > >> the state TTL as metadata of the operator's state to/from the compiled > >> JSON > >> plan, to achieve the goal that specifying different state TTL when > >> transforming the exec node to stateful operators. > >> > >> Look forward to your opinions! > >> > >> [1] > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 > >> > >> Best Regards, > >> Jane Chan > >> > > >
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Correction: “users can set 'scan.startup.mode' for kafka connector” -> “users can set 'scan.startup.mode' for kafka connector by dynamic table option” Shuo Cheng 于2023年3月23日 周四21:50写道: > Hi Jane, > Thanks for driving this, operator level state ttl is absolutely a desired > feature. I would share my opinion as following: > > If the scope of this proposal is limited as an enhancement for compiled > json plan, it makes sense. I think it does not conflict with configuring > state ttl > in other ways, e.g., SQL HINT or something else, because they just work in > different level, SQL Hint works in the exact entrance of SQL API, while > compiled json plan is the intermediate results for SQL. > I think the final shape of state ttl configuring may like the that, users > can define operator state ttl using SQL HINT (assumption...), but it may > affects more than one stateful operators inside the same query block, then > users can further configure a specific one by modifying the compiled json > plan... > > In a word, this proposal is in good shape as an enhancement for compiled > json plan, and it's orthogonal with other ways like SQL Hint which works in > a higher level. > > > Nips: > > > "From the SQL semantic perspective, hints cannot intervene in the > calculation of data results." > I think it's more properly to say that hint does not affect the > equivalence of execution plans (hash agg vs sort agg), not the equivalence > of execution results, e.g., users can set 'scan.startup.mode' for kafka > connector, which also "intervene in the calculation of data results". > > Sincerely, > Shuo > > On Tue, Mar 21, 2023 at 7:52 PM Jane Chan wrote: > >> Hi devs, >> >> I'd like to start a discussion on FLIP-292: Support configuring state TTL >> at operator level for Table API & SQL programs [1]. >> >> Currently, we only support job-level state TTL configuration via >> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL >> control to optimize state usage. Hence we propose to serialize/deserialize >> the state TTL as metadata of the operator's state to/from the compiled >> JSON >> plan, to achieve the goal that specifying different state TTL when >> transforming the exec node to stateful operators. >> >> Look forward to your opinions! >> >> [1] >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 >> >> Best Regards, >> Jane Chan >> >
Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs
Hi Jane, Thanks for driving this, operator level state ttl is absolutely a desired feature. I would share my opinion as following: If the scope of this proposal is limited as an enhancement for compiled json plan, it makes sense. I think it does not conflict with configuring state ttl in other ways, e.g., SQL HINT or something else, because they just work in different level, SQL Hint works in the exact entrance of SQL API, while compiled json plan is the intermediate results for SQL. I think the final shape of state ttl configuring may like the that, users can define operator state ttl using SQL HINT (assumption...), but it may affects more than one stateful operators inside the same query block, then users can further configure a specific one by modifying the compiled json plan... In a word, this proposal is in good shape as an enhancement for compiled json plan, and it's orthogonal with other ways like SQL Hint which works in a higher level. Nips: > "From the SQL semantic perspective, hints cannot intervene in the calculation of data results." I think it's more properly to say that hint does not affect the equivalence of execution plans (hash agg vs sort agg), not the equivalence of execution results, e.g., users can set 'scan.startup.mode' for kafka connector, which also "intervene in the calculation of data results". Sincerely, Shuo On Tue, Mar 21, 2023 at 7:52 PM Jane Chan wrote: > Hi devs, > > I'd like to start a discussion on FLIP-292: Support configuring state TTL > at operator level for Table API & SQL programs [1]. > > Currently, we only support job-level state TTL configuration via > 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL > control to optimize state usage. Hence we propose to serialize/deserialize > the state TTL as metadata of the operator's state to/from the compiled JSON > plan, to achieve the goal that specifying different state TTL when > transforming the exec node to stateful operators. > > Look forward to your opinions! > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 > > Best Regards, > Jane Chan >