Hi everyone,

I started a vote for this FLIP [1], and the voting thread can be found at
[2]. If you have any questions, please don't hesitate to join this
discussion.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
[2] https://lists.apache.org/thread/jc5ngs6kxdn179xmj6oqchkl5frdkgr2


Best,
Jane

On Fri, Apr 7, 2023 at 5:25 PM Jane Chan <qingyue....@gmail.com> wrote:

> Hi, devs,
>
> Thanks for all the feedback.
>
> Based on the discussion [1], we seem to have a consensus so far, so I
> would like to start a vote on FLIP-292 [2], which begins on the following
> Monday (Apr. 10th at 10:00 AM GMT).
>
> If you have any questions or concerns, please don't hesitate to follow up
> on this discussion.
>
> [1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951&show-miniview
>
> Best regards,
> Jane
>
> On Thu, Apr 6, 2023 at 10:33 AM Jane Chan <qingyue....@gmail.com> wrote:
>
>> Hi, devs,
>>
>> Thanks for your valuable feedback. I've changed the title of the FLIP to
>> "Enhance COMPILED PLAN to support operator-level state TTL configuration"
>> and added an explanation for StateMetadata. If you have any concerns,
>> please let me know.
>>
>> Best regards,
>> Jane
>>
>> On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <qingyue....@gmail.com> wrote:
>>
>>> Hi Timo,
>>>
>>> Thanks for your valuable feedback. Let me explain the design details.
>>>
>>> > However, actually fine-grained state TTL should already be possible
>>> today. I don't fully understand where your proposed StateMetadata is
>>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
>>> TwoInputStreamOperator?
>>>
>>> Currently, all ExecNodes that support JSON SerDe are annotated with
>>> @ExecNoteMetadata. This annotation interface has a key called
>>> consumedOptions, which persists all configuration that affects the
>>> topology. For ExecNodes that translate to OneInputStreamOperator, adding
>>> "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of
>>> configuring TTL with fine granularity. However, this is not a generalized
>>> solution for ExecNodes that translate to TwoInputStreamOperator or
>>> MultipleInputStreamOperator. Because we may need to set different TTLs for
>>> the left / right (or k-th) input stream, but we do not want to introduce
>>> configurations like "table.exec.left-state.ttl" or
>>> "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl".
>>>
>>> The proposed StateMetadata will be the member variable of ExecNodes that
>>> translates to stateful operators, similar to inputProperties (which is
>>> shared by all ExecNodes, though).
>>> I'd like to illustrate this in the following snippet of code for
>>> StreamExecJoin.
>>>
>>> @ExecNodeMetadata(
>>>       name = "stream-exec-join",
>>>       version = 1,
>>>       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
>>>       minPlanVersion = FlinkVersion.v1_15,
>>>       minStateVersion = FlinkVersion.v1_15)
>>>
>>> +@ExecNodeMetadata(
>>> +       name = "stream-exec-join",
>>> +       version = 2,
>>> +       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
>>> +       minPlanVersion = FlinkVersion.v1_18,
>>> +       minStateVersion = FlinkVersion.v1_15)
>>> public class StreamExecJoin extends ExecNodeBase<RowData>
>>>       implements StreamExecNode<RowData>, SingleTransformationTranslator
>>> <RowData> {
>>>
>>>
>>> +   @Nullable
>>> +   @JsonProperty(FIELD_NAME_STATE)
>>> +   private final List<StateMetadata> stateMetadataList;
>>>
>>>   public StreamExecJoin(
>>>           ReadableConfig tableConfig,
>>>           JoinSpec joinSpec,
>>>           List<int[]> leftUpsertKeys,
>>>           List<int[]> rightUpsertKeys,
>>>           InputProperty leftInputProperty,
>>>           InputProperty rightInputProperty,
>>>           RowType outputType,
>>>           String description) {
>>>       this(
>>>               ExecNodeContext.newNodeId(),
>>>               ExecNodeContext.newContext(StreamExecJoin.class),
>>>               ExecNodeContext.newPersistedConfig(StreamExecJoin.class,
>>> tableConfig),
>>>               joinSpec,
>>>               leftUpsertKeys,
>>>               rightUpsertKeys,
>>>               Lists.newArrayList(leftInputProperty, rightInputProperty),
>>>               outputType,
>>>               description,
>>> +              StateMetadata.multiInputDefaultMeta(tableConfig,
>>> LEFT_STATE_NAME, RIGHT_STATE_NAME));
>>>   }
>>>
>>>   @JsonCreator
>>>   public StreamExecJoin(
>>>           @JsonProperty(FIELD_NAME_ID) int id,
>>>           @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
>>>           @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
>>> persistedConfig,
>>>           @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
>>>           @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]>
>>> leftUpsertKeys,
>>>           @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]>
>>> rightUpsertKeys,
>>>           @JsonProperty(FIELD_NAME_INPUT_PROPERTIES)
>>> List<InputProperty> inputProperties,
>>>           @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
>>>           @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
>>>
>>> +          @Nullable
>>> +          @JsonProperty(FIELD_NAME_STATE) List<StateMetadata>
>>> stateMetadataList) {
>>>       super(id, context, persistedConfig, inputProperties, outputType,
>>> description);
>>>       checkArgument(inputProperties.size() == 2);
>>>       this.joinSpec = checkNotNull(joinSpec);
>>>       this.leftUpsertKeys = leftUpsertKeys;
>>>       this.rightUpsertKeys = rightUpsertKeys;
>>> +      this.stateMetadataList = stateMetadataList;
>>>   }
>>> @Override
>>>   @SuppressWarnings("unchecked")
>>>   protected Transformation<RowData> translateToPlanInternal(
>>>           PlannerBase planner, ExecNodeConfig config) {
>>>       final ExecEdge leftInputEdge = …;
>>>       final ExecEdge rightInputEdge = …;
>>>       . . .
>>>
>>>        // for backward compatibility
>>>       long leftStateRetentionTime =
>>>               isNullOrEmpty(stateMetadataList)
>>>                       ? config.getStateRetentionTime()
>>>                       : stateMetadataList.get(0).getStateTtl();
>>>       long rightStateRetentionTime =
>>>               isNullOrEmpty(stateMetadataList)
>>>                       ? leftStateRetentionTime
>>>                       : stateMetadataList.get(1).getStateTtl();
>>>
>>>       AbstractStreamingJoinOperator operator;
>>>       FlinkJoinType joinType = joinSpec.getJoinType();
>>>       if (joinType == FlinkJoinType.ANTI || joinType ==
>>> FlinkJoinType.SEMI) {
>>>           operator =
>>>                   new StreamingSemiAntiJoinOperator(
>>>                           joinType == FlinkJoinType.ANTI,
>>>                           leftTypeInfo,
>>>                           rightTypeInfo,
>>>                           generatedCondition,
>>>                           leftInputSpec,
>>>                           rightInputSpec,
>>>                           joinSpec.getFilterNulls(),
>>>                           leftStateRetentionTime,
>>> +                          rightStateRetentionTime);
>>>       } else {
>>>
>>>            operator =
>>>                   new StreamingJoinOperator(
>>>                           leftTypeInfo,
>>>                           rightTypeInfo,
>>>                           generatedCondition,
>>>                           leftInputSpec,
>>>                           rightInputSpec,
>>>                           leftIsOuter,
>>>                           rightIsOuter,
>>>                           joinSpec.getFilterNulls(),
>>>                           leftStateRetentionTime,
>>> +                          rightStateRetentionTime);
>>>       }
>>>
>>>       . . .
>>>       return transform;
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> Best regards,
>>> Jane
>>>
>>> On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Jane,
>>>>
>>>> thanks for proposing this FLIP. More state insights and fine-grained
>>>> state TTL are a frequently requested feature for Flink SQL. Eventually,
>>>> we need to address this.
>>>>
>>>> I agree with the previous responses that doing this with a hint might
>>>> cause more confusion than it actually helps. We should use hints only
>>>> if
>>>> they can be placed close to an operation (e.g. JOIN or table). And only
>>>> where a global flag for the entire query is not sufficient using SET.
>>>>
>>>> In general, I support the current direction of the FLIP and continuing
>>>> the vision of FLIP-190. However, actually fine-grained state TTL should
>>>> already be possible today. Maybe this is untested yet, but we largely
>>>> reworked how configuration works within the planner in Flink 1.15.
>>>>
>>>> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already
>>>> combines
>>>> configuration coming from TableConfig with per-ExecNode config.
>>>> Actually, state TTL from JSON plan should already have higher
>>>> precedence
>>>> than TableConfig.
>>>>
>>>> It would be great to extend the meta-information of ExecNodes with
>>>> state
>>>> insights. I don't fully understand where your proposed StateMetadata is
>>>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
>>>> TwoInputStreamOperator?
>>>>
>>>> I think it should be a combination of ExecNodeMetadata with rough
>>>> estimates (declaration) or StreamExecNode. But should not bubble into
>>>> TwoInputStreamOperator.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
>>>>
>>>> On 03.04.23 09:15, godfrey he wrote:
>>>> > Hi Jane,
>>>> >
>>>> > Thanks for driving this FLIP.
>>>> >
>>>> > I think the compiled plan solution and the hint solution do not
>>>> > conflict, the two can exist at the same time.
>>>> > The compiled plan solution can address the need of advanced users and
>>>> > the platform users
>>>> > which all stateful operators' state TTL can be defined by user. While
>>>> > the hint solution can address some
>>>> >   specific simple scenarios, which is very user-friendly, convenient,
>>>> > and unambiguous to use.
>>>> >
>>>> > Some stateful operators are not compiled from SQL directly, such as
>>>> > ChangelogNormalize and
>>>> > SinkUpsertMaterializer mentioned above,  I notice the the example
>>>> given by Yisha
>>>> > has hints propagation problem which does not conform to the current
>>>> design.
>>>> > The rough idea about the hint solution should be simple (only the
>>>> > common operators are supported)
>>>> > and easy to understand (no hints propagation).
>>>> >
>>>> > If the hint solution is supported, a compiled plan which is from a
>>>> > query with state TTL hints
>>>> >   can also be further modified for the state TTL parts.
>>>> >
>>>> > So, I prefer the hint solution to be discuss in a separate FLIP.  I
>>>> > think that FLIP maybe
>>>> > need a lot discussion.
>>>> >
>>>> > Best,
>>>> > Godfrey
>>>> >
>>>> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道:
>>>> >>
>>>> >> Hi Jane,
>>>> >>
>>>> >> Thanks for your detailed response.
>>>> >>
>>>> >> You mentioned that there are 10k+ SQL jobs in your production
>>>> >>> environment, but only ~100 jobs' migration involves plan editing.
>>>> Is 10k+
>>>> >>> the number of total jobs, or the number of jobs that use stateful
>>>> >>> computation and need state migration?
>>>> >>>
>>>> >>
>>>> >> 10k is the number of SQL jobs that enable periodic checkpoint. And
>>>> >> surely if users change their sql which result in changes of the
>>>> plan, they
>>>> >> need to do state migration.
>>>> >>
>>>> >> - You mentioned that "A truth that can not be ignored is that users
>>>> >>> usually tend to give up editing TTL(or operator ID in our case)
>>>> instead of
>>>> >>> migrating this configuration between their versions of one given
>>>> job." So
>>>> >>> what would users prefer to do if they're reluctant to edit the
>>>> operator
>>>> >>> ID? Would they submit the same SQL as a new job with a higher
>>>> version to
>>>> >>> re-accumulating the state from the earliest offset?
>>>> >>
>>>> >>
>>>> >> You're exactly right. People will tend to re-accumulate the state
>>>> from a
>>>> >> given offset by changing the namespace of their checkpoint.
>>>> >> Namespace is an internal concept and restarting the sql job in a new
>>>> >> namespace can be simply understood as submitting a new job.
>>>> >>
>>>> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the
>>>> >>> following syntax to perform plan migration
>>>> >>
>>>> >>
>>>> >> The 'plan migration'  I said in my last reply may be inaccurate.
>>>> It's more
>>>> >> like 'query evolution'. In other word, if a user submitted a sql job
>>>> with a
>>>> >> configured compiled plan, and then
>>>> >> he changes the sql,  the compiled plan changes too, how to move the
>>>> >> configuration in the old plan to the new plan.
>>>> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and
>>>> leave out
>>>> >> the 'query evolution' which is a fundamental change to the query.
>>>> E.g.
>>>> >> adding a filter condition, a different aggregation.
>>>> >> And I'm really looking forward to a solution for query evolution.
>>>> >>
>>>> >> And I'm also curious about how to use the hint
>>>> >>> approach to cover cases like
>>>> >>>
>>>> >>> - configuring TTL for operators like ChangelogNormalize,
>>>> >>> SinkUpsertMaterializer, etc., these operators are derived by the
>>>> planner
>>>> >>> implicitly
>>>> >>> - cope with two/multiple input stream operator's state TTL, like
>>>> join,
>>>> >>> and other operations like row_number, rank, correlate, etc.
>>>> >>
>>>> >>
>>>> >>   Actually, in our company , we make operators in the query block
>>>> where the
>>>> >> hint locates all affected by that hint. For example,
>>>> >>
>>>> >> INSERT INTO sink
>>>> >>> SELECT /*+ STATE_TTL('1D') */
>>>> >>>     id,
>>>> >>>     name,
>>>> >>>     num
>>>> >>> FROM (
>>>> >>>     SELECT
>>>> >>>         *,
>>>> >>>         ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as
>>>> row_num
>>>> >>>     FROM (
>>>> >>>         SELECT
>>>> >>>             *
>>>> >>>         FROM (
>>>> >>>             SELECT
>>>> >>>                 id,
>>>> >>>                 name,
>>>> >>>                 max(num) as num
>>>> >>>             FROM source1
>>>> >>>             GROUP BY
>>>> >>>                 id, name, TUMBLE(proc, INTERVAL '1' MINUTE)
>>>> >>>         )
>>>> >>>         GROUP BY
>>>> >>>             id, name, num
>>>> >>>     )
>>>> >>> )
>>>> >>> WHERE row_num = 1
>>>> >>>
>>>> >>
>>>> >> In the SQL above, the state TTL of Rank and Agg will be all
>>>> configured as 1
>>>> >> day.  If users want to set different TTL for Rank and Agg, they can
>>>> just
>>>> >> make these two queries located in two different query blocks.
>>>> >> It looks quite rough but straightforward enough.  For each side of
>>>> join
>>>> >> operator, one of my users proposed a syntax like below:
>>>> >>
>>>> >>> /*+
>>>> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000')
>>>> */
>>>> >>>
>>>> >>> We haven't accepted this proposal now, maybe we could find some
>>>> better
>>>> >> design for this kind of case. Just for your information.
>>>> >>
>>>> >> I think if we want to utilize hints to support fine-grained
>>>> configuration,
>>>> >> we can open a new FLIP to discuss it.
>>>> >> BTW, personally, I'm interested in how to design a graphical
>>>> interface to
>>>> >> help users to maintain their custom fine-grained configuration
>>>> between
>>>> >> their job versions.
>>>> >>
>>>> >> Best regards,
>>>> >> Yisha
>>>> >
>>>>
>>>>

Reply via email to