Hi everyone, I started a vote for this FLIP [1], and the voting thread can be found at [2]. If you have any questions, please don't hesitate to join this discussion.
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 [2] https://lists.apache.org/thread/jc5ngs6kxdn179xmj6oqchkl5frdkgr2 Best, Jane On Fri, Apr 7, 2023 at 5:25 PM Jane Chan <[email protected]> wrote: > Hi, devs, > > Thanks for all the feedback. > > Based on the discussion [1], we seem to have a consensus so far, so I > would like to start a vote on FLIP-292 [2], which begins on the following > Monday (Apr. 10th at 10:00 AM GMT). > > If you have any questions or concerns, please don't hesitate to follow up > on this discussion. > > [1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951&show-miniview > > Best regards, > Jane > > On Thu, Apr 6, 2023 at 10:33 AM Jane Chan <[email protected]> wrote: > >> Hi, devs, >> >> Thanks for your valuable feedback. I've changed the title of the FLIP to >> "Enhance COMPILED PLAN to support operator-level state TTL configuration" >> and added an explanation for StateMetadata. If you have any concerns, >> please let me know. >> >> Best regards, >> Jane >> >> On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <[email protected]> wrote: >> >>> Hi Timo, >>> >>> Thanks for your valuable feedback. Let me explain the design details. >>> >>> > However, actually fine-grained state TTL should already be possible >>> today. I don't fully understand where your proposed StateMetadata is >>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or >>> TwoInputStreamOperator? >>> >>> Currently, all ExecNodes that support JSON SerDe are annotated with >>> @ExecNoteMetadata. This annotation interface has a key called >>> consumedOptions, which persists all configuration that affects the >>> topology. For ExecNodes that translate to OneInputStreamOperator, adding >>> "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of >>> configuring TTL with fine granularity. However, this is not a generalized >>> solution for ExecNodes that translate to TwoInputStreamOperator or >>> MultipleInputStreamOperator. Because we may need to set different TTLs for >>> the left / right (or k-th) input stream, but we do not want to introduce >>> configurations like "table.exec.left-state.ttl" or >>> "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl". >>> >>> The proposed StateMetadata will be the member variable of ExecNodes that >>> translates to stateful operators, similar to inputProperties (which is >>> shared by all ExecNodes, though). >>> I'd like to illustrate this in the following snippet of code for >>> StreamExecJoin. >>> >>> @ExecNodeMetadata( >>> name = "stream-exec-join", >>> version = 1, >>> producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, >>> minPlanVersion = FlinkVersion.v1_15, >>> minStateVersion = FlinkVersion.v1_15) >>> >>> +@ExecNodeMetadata( >>> + name = "stream-exec-join", >>> + version = 2, >>> + producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, >>> + minPlanVersion = FlinkVersion.v1_18, >>> + minStateVersion = FlinkVersion.v1_15) >>> public class StreamExecJoin extends ExecNodeBase<RowData> >>> implements StreamExecNode<RowData>, SingleTransformationTranslator >>> <RowData> { >>> >>> >>> + @Nullable >>> + @JsonProperty(FIELD_NAME_STATE) >>> + private final List<StateMetadata> stateMetadataList; >>> >>> public StreamExecJoin( >>> ReadableConfig tableConfig, >>> JoinSpec joinSpec, >>> List<int[]> leftUpsertKeys, >>> List<int[]> rightUpsertKeys, >>> InputProperty leftInputProperty, >>> InputProperty rightInputProperty, >>> RowType outputType, >>> String description) { >>> this( >>> ExecNodeContext.newNodeId(), >>> ExecNodeContext.newContext(StreamExecJoin.class), >>> ExecNodeContext.newPersistedConfig(StreamExecJoin.class, >>> tableConfig), >>> joinSpec, >>> leftUpsertKeys, >>> rightUpsertKeys, >>> Lists.newArrayList(leftInputProperty, rightInputProperty), >>> outputType, >>> description, >>> + StateMetadata.multiInputDefaultMeta(tableConfig, >>> LEFT_STATE_NAME, RIGHT_STATE_NAME)); >>> } >>> >>> @JsonCreator >>> public StreamExecJoin( >>> @JsonProperty(FIELD_NAME_ID) int id, >>> @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, >>> @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig >>> persistedConfig, >>> @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec, >>> @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> >>> leftUpsertKeys, >>> @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> >>> rightUpsertKeys, >>> @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) >>> List<InputProperty> inputProperties, >>> @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, >>> @JsonProperty(FIELD_NAME_DESCRIPTION) String description, >>> >>> + @Nullable >>> + @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> >>> stateMetadataList) { >>> super(id, context, persistedConfig, inputProperties, outputType, >>> description); >>> checkArgument(inputProperties.size() == 2); >>> this.joinSpec = checkNotNull(joinSpec); >>> this.leftUpsertKeys = leftUpsertKeys; >>> this.rightUpsertKeys = rightUpsertKeys; >>> + this.stateMetadataList = stateMetadataList; >>> } >>> @Override >>> @SuppressWarnings("unchecked") >>> protected Transformation<RowData> translateToPlanInternal( >>> PlannerBase planner, ExecNodeConfig config) { >>> final ExecEdge leftInputEdge = …; >>> final ExecEdge rightInputEdge = …; >>> . . . >>> >>> // for backward compatibility >>> long leftStateRetentionTime = >>> isNullOrEmpty(stateMetadataList) >>> ? config.getStateRetentionTime() >>> : stateMetadataList.get(0).getStateTtl(); >>> long rightStateRetentionTime = >>> isNullOrEmpty(stateMetadataList) >>> ? leftStateRetentionTime >>> : stateMetadataList.get(1).getStateTtl(); >>> >>> AbstractStreamingJoinOperator operator; >>> FlinkJoinType joinType = joinSpec.getJoinType(); >>> if (joinType == FlinkJoinType.ANTI || joinType == >>> FlinkJoinType.SEMI) { >>> operator = >>> new StreamingSemiAntiJoinOperator( >>> joinType == FlinkJoinType.ANTI, >>> leftTypeInfo, >>> rightTypeInfo, >>> generatedCondition, >>> leftInputSpec, >>> rightInputSpec, >>> joinSpec.getFilterNulls(), >>> leftStateRetentionTime, >>> + rightStateRetentionTime); >>> } else { >>> >>> operator = >>> new StreamingJoinOperator( >>> leftTypeInfo, >>> rightTypeInfo, >>> generatedCondition, >>> leftInputSpec, >>> rightInputSpec, >>> leftIsOuter, >>> rightIsOuter, >>> joinSpec.getFilterNulls(), >>> leftStateRetentionTime, >>> + rightStateRetentionTime); >>> } >>> >>> . . . >>> return transform; >>> } >>> } >>> >>> >>> >>> >>> Best regards, >>> Jane >>> >>> On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <[email protected]> wrote: >>> >>>> Hi Jane, >>>> >>>> thanks for proposing this FLIP. More state insights and fine-grained >>>> state TTL are a frequently requested feature for Flink SQL. Eventually, >>>> we need to address this. >>>> >>>> I agree with the previous responses that doing this with a hint might >>>> cause more confusion than it actually helps. We should use hints only >>>> if >>>> they can be placed close to an operation (e.g. JOIN or table). And only >>>> where a global flag for the entire query is not sufficient using SET. >>>> >>>> In general, I support the current direction of the FLIP and continuing >>>> the vision of FLIP-190. However, actually fine-grained state TTL should >>>> already be possible today. Maybe this is untested yet, but we largely >>>> reworked how configuration works within the planner in Flink 1.15. >>>> >>>> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already >>>> combines >>>> configuration coming from TableConfig with per-ExecNode config. >>>> Actually, state TTL from JSON plan should already have higher >>>> precedence >>>> than TableConfig. >>>> >>>> It would be great to extend the meta-information of ExecNodes with >>>> state >>>> insights. I don't fully understand where your proposed StateMetadata is >>>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or >>>> TwoInputStreamOperator? >>>> >>>> I think it should be a combination of ExecNodeMetadata with rough >>>> estimates (declaration) or StreamExecNode. But should not bubble into >>>> TwoInputStreamOperator. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] >>>> >>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java >>>> >>>> On 03.04.23 09:15, godfrey he wrote: >>>> > Hi Jane, >>>> > >>>> > Thanks for driving this FLIP. >>>> > >>>> > I think the compiled plan solution and the hint solution do not >>>> > conflict, the two can exist at the same time. >>>> > The compiled plan solution can address the need of advanced users and >>>> > the platform users >>>> > which all stateful operators' state TTL can be defined by user. While >>>> > the hint solution can address some >>>> > specific simple scenarios, which is very user-friendly, convenient, >>>> > and unambiguous to use. >>>> > >>>> > Some stateful operators are not compiled from SQL directly, such as >>>> > ChangelogNormalize and >>>> > SinkUpsertMaterializer mentioned above, I notice the the example >>>> given by Yisha >>>> > has hints propagation problem which does not conform to the current >>>> design. >>>> > The rough idea about the hint solution should be simple (only the >>>> > common operators are supported) >>>> > and easy to understand (no hints propagation). >>>> > >>>> > If the hint solution is supported, a compiled plan which is from a >>>> > query with state TTL hints >>>> > can also be further modified for the state TTL parts. >>>> > >>>> > So, I prefer the hint solution to be discuss in a separate FLIP. I >>>> > think that FLIP maybe >>>> > need a lot discussion. >>>> > >>>> > Best, >>>> > Godfrey >>>> > >>>> > 周伊莎 <[email protected]> 于2023年3月30日周四 22:04写道: >>>> >> >>>> >> Hi Jane, >>>> >> >>>> >> Thanks for your detailed response. >>>> >> >>>> >> You mentioned that there are 10k+ SQL jobs in your production >>>> >>> environment, but only ~100 jobs' migration involves plan editing. >>>> Is 10k+ >>>> >>> the number of total jobs, or the number of jobs that use stateful >>>> >>> computation and need state migration? >>>> >>> >>>> >> >>>> >> 10k is the number of SQL jobs that enable periodic checkpoint. And >>>> >> surely if users change their sql which result in changes of the >>>> plan, they >>>> >> need to do state migration. >>>> >> >>>> >> - You mentioned that "A truth that can not be ignored is that users >>>> >>> usually tend to give up editing TTL(or operator ID in our case) >>>> instead of >>>> >>> migrating this configuration between their versions of one given >>>> job." So >>>> >>> what would users prefer to do if they're reluctant to edit the >>>> operator >>>> >>> ID? Would they submit the same SQL as a new job with a higher >>>> version to >>>> >>> re-accumulating the state from the earliest offset? >>>> >> >>>> >> >>>> >> You're exactly right. People will tend to re-accumulate the state >>>> from a >>>> >> given offset by changing the namespace of their checkpoint. >>>> >> Namespace is an internal concept and restarting the sql job in a new >>>> >> namespace can be simply understood as submitting a new job. >>>> >> >>>> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the >>>> >>> following syntax to perform plan migration >>>> >> >>>> >> >>>> >> The 'plan migration' I said in my last reply may be inaccurate. >>>> It's more >>>> >> like 'query evolution'. In other word, if a user submitted a sql job >>>> with a >>>> >> configured compiled plan, and then >>>> >> he changes the sql, the compiled plan changes too, how to move the >>>> >> configuration in the old plan to the new plan. >>>> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and >>>> leave out >>>> >> the 'query evolution' which is a fundamental change to the query. >>>> E.g. >>>> >> adding a filter condition, a different aggregation. >>>> >> And I'm really looking forward to a solution for query evolution. >>>> >> >>>> >> And I'm also curious about how to use the hint >>>> >>> approach to cover cases like >>>> >>> >>>> >>> - configuring TTL for operators like ChangelogNormalize, >>>> >>> SinkUpsertMaterializer, etc., these operators are derived by the >>>> planner >>>> >>> implicitly >>>> >>> - cope with two/multiple input stream operator's state TTL, like >>>> join, >>>> >>> and other operations like row_number, rank, correlate, etc. >>>> >> >>>> >> >>>> >> Actually, in our company , we make operators in the query block >>>> where the >>>> >> hint locates all affected by that hint. For example, >>>> >> >>>> >> INSERT INTO sink >>>> >>> SELECT /*+ STATE_TTL('1D') */ >>>> >>> id, >>>> >>> name, >>>> >>> num >>>> >>> FROM ( >>>> >>> SELECT >>>> >>> *, >>>> >>> ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as >>>> row_num >>>> >>> FROM ( >>>> >>> SELECT >>>> >>> * >>>> >>> FROM ( >>>> >>> SELECT >>>> >>> id, >>>> >>> name, >>>> >>> max(num) as num >>>> >>> FROM source1 >>>> >>> GROUP BY >>>> >>> id, name, TUMBLE(proc, INTERVAL '1' MINUTE) >>>> >>> ) >>>> >>> GROUP BY >>>> >>> id, name, num >>>> >>> ) >>>> >>> ) >>>> >>> WHERE row_num = 1 >>>> >>> >>>> >> >>>> >> In the SQL above, the state TTL of Rank and Agg will be all >>>> configured as 1 >>>> >> day. If users want to set different TTL for Rank and Agg, they can >>>> just >>>> >> make these two queries located in two different query blocks. >>>> >> It looks quite rough but straightforward enough. For each side of >>>> join >>>> >> operator, one of my users proposed a syntax like below: >>>> >> >>>> >>> /*+ >>>> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000') >>>> */ >>>> >>> >>>> >>> We haven't accepted this proposal now, maybe we could find some >>>> better >>>> >> design for this kind of case. Just for your information. >>>> >> >>>> >> I think if we want to utilize hints to support fine-grained >>>> configuration, >>>> >> we can open a new FLIP to discuss it. >>>> >> BTW, personally, I'm interested in how to design a graphical >>>> interface to >>>> >> help users to maintain their custom fine-grained configuration >>>> between >>>> >> their job versions. >>>> >> >>>> >> Best regards, >>>> >> Yisha >>>> > >>>> >>>>
