[DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-25 Thread Zakelly Lan
Hi devs,

I'd like to start a discussion on FLIP-406: Reorganize State &
Checkpointing & Recovery Configuration[1].

Currently, the configuration options pertaining to checkpointing, recovery,
and state management are primarily grouped under the following prefixes:

   - state.backend.* : configurations related to state accessing and
   checkpointing, as well as specific options for individual state backends
   - execution.checkpointing.* : configurations associated with checkpoint
   execution and recovery
   - execution.savepoint.*: configurations for recovery from savepoint

In addition, there are several individual options such as '
*state.checkpoint-storage*' and '*state.checkpoints.dir*' that fall outside
of these prefixes. The current arrangement of these options, which span
multiple modules, is somewhat haphazard and lacks a systematic structure.
For example, the options under the '*CheckpointingOptions*' and '
*ExecutionCheckpointingOptions*' are related and have no clear boundaries
from the user's perspective, but there is no unified prefix for them. With
the upcoming release of Flink 2.0, we have an excellent opportunity to
overhaul and restructure the configurations related to checkpointing,
recovery, and state management. This FLIP proposes to reorganize these
settings, making it more coherent by module, which would significantly
lower the barriers for understanding and reduce the development costs
moving forward.

Looking forward to hearing from you!

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560

Best,
Zakelly


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-25 Thread Rui Fan
Hi Zakelly,

Thank you for driving this proposal!

Overall good for me. I have some questions about these names.

1. How about execution.checkpointing.storage.type instead of
execution.checkpointing.storage?

It's similar to state.backend.type.

2. How about execution.checkpointing.local-copy.enabled instead of
execution.checkpointing.local-copy?

You added a new option: execution.checkpointing.local-copy.dir.
IIUC, one option name shouldn't be the prefix of other options.
If you add a new option execution.checkpointing.local-copy,
flink CI will fail directly.

3. execution.checkpointing.savepoint.dir is a little weird.

For old options: state.savepoints.dir and state.checkpoints.dir,
the savepoint and checkpoint are the same level. It means
it's a checkpoint or savepoint.

The new option execution.checkpointing.dir is fine for me.
However, execution.checkpointing.savepoint.dir is a little weird.
I don't know which name is better now. Let us think about it more.

4. How about execution.recovery.claim-mode instead of
execution.recovery.mode?

The meaning of mode is too broad. The claim-mode may
be more accurate for users.

WDYT?

Best,
Rui

On Mon, Dec 25, 2023 at 5:14 PM Zakelly Lan  wrote:

> Hi devs,
>
> I'd like to start a discussion on FLIP-406: Reorganize State &
> Checkpointing & Recovery Configuration[1].
>
> Currently, the configuration options pertaining to checkpointing, recovery,
> and state management are primarily grouped under the following prefixes:
>
>- state.backend.* : configurations related to state accessing and
>checkpointing, as well as specific options for individual state backends
>- execution.checkpointing.* : configurations associated with checkpoint
>execution and recovery
>- execution.savepoint.*: configurations for recovery from savepoint
>
> In addition, there are several individual options such as '
> *state.checkpoint-storage*' and '*state.checkpoints.dir*' that fall outside
> of these prefixes. The current arrangement of these options, which span
> multiple modules, is somewhat haphazard and lacks a systematic structure.
> For example, the options under the '*CheckpointingOptions*' and '
> *ExecutionCheckpointingOptions*' are related and have no clear boundaries
> from the user's perspective, but there is no unified prefix for them. With
> the upcoming release of Flink 2.0, we have an excellent opportunity to
> overhaul and restructure the configurations related to checkpointing,
> recovery, and state management. This FLIP proposes to reorganize these
> settings, making it more coherent by module, which would significantly
> lower the barriers for understanding and reduce the development costs
> moving forward.
>
> Looking forward to hearing from you!
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560
>
> Best,
> Zakelly
>


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-25 Thread Junrui Lee
Hi Zakelly,

Thanks for driving this. I agree that the proposed restructuring of the
configuration options is largely positive. It will make understanding and
working with Flink configurations more intuitive.

Most of the proposed changes look great. Just a heads-up, as Rui Fan
mentioned, Flink currently requires that no configOption's key be the
prefix of another to avoid issues when we eventually adopt a standard YAML
parser, as detailed in FLINK-29372 (
https://issues.apache.org/jira/browse/FLINK-29372). Therefore, it's better
to change the key 'execution.checkpointing.local-copy' because it serves as
a prefix to the key 'execution.checkpointing.local-copy.dir'.

Best regards,
Junrui

Rui Fan <1996fan...@gmail.com> 于2023年12月25日周一 19:11写道:

> Hi Zakelly,
>
> Thank you for driving this proposal!
>
> Overall good for me. I have some questions about these names.
>
> 1. How about execution.checkpointing.storage.type instead of
> execution.checkpointing.storage?
>
> It's similar to state.backend.type.
>
> 2. How about execution.checkpointing.local-copy.enabled instead of
> execution.checkpointing.local-copy?
>
> You added a new option: execution.checkpointing.local-copy.dir.
> IIUC, one option name shouldn't be the prefix of other options.
> If you add a new option execution.checkpointing.local-copy,
> flink CI will fail directly.
>
> 3. execution.checkpointing.savepoint.dir is a little weird.
>
> For old options: state.savepoints.dir and state.checkpoints.dir,
> the savepoint and checkpoint are the same level. It means
> it's a checkpoint or savepoint.
>
> The new option execution.checkpointing.dir is fine for me.
> However, execution.checkpointing.savepoint.dir is a little weird.
> I don't know which name is better now. Let us think about it more.
>
> 4. How about execution.recovery.claim-mode instead of
> execution.recovery.mode?
>
> The meaning of mode is too broad. The claim-mode may
> be more accurate for users.
>
> WDYT?
>
> Best,
> Rui
>
> On Mon, Dec 25, 2023 at 5:14 PM Zakelly Lan  wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion on FLIP-406: Reorganize State &
> > Checkpointing & Recovery Configuration[1].
> >
> > Currently, the configuration options pertaining to checkpointing,
> recovery,
> > and state management are primarily grouped under the following prefixes:
> >
> >- state.backend.* : configurations related to state accessing and
> >checkpointing, as well as specific options for individual state
> backends
> >- execution.checkpointing.* : configurations associated with
> checkpoint
> >execution and recovery
> >- execution.savepoint.*: configurations for recovery from savepoint
> >
> > In addition, there are several individual options such as '
> > *state.checkpoint-storage*' and '*state.checkpoints.dir*' that fall
> outside
> > of these prefixes. The current arrangement of these options, which span
> > multiple modules, is somewhat haphazard and lacks a systematic structure.
> > For example, the options under the '*CheckpointingOptions*' and '
> > *ExecutionCheckpointingOptions*' are related and have no clear boundaries
> > from the user's perspective, but there is no unified prefix for them.
> With
> > the upcoming release of Flink 2.0, we have an excellent opportunity to
> > overhaul and restructure the configurations related to checkpointing,
> > recovery, and state management. This FLIP proposes to reorganize these
> > settings, making it more coherent by module, which would significantly
> > lower the barriers for understanding and reduce the development costs
> > moving forward.
> >
> > Looking forward to hearing from you!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560
> >
> > Best,
> > Zakelly
> >
>


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-25 Thread Zakelly Lan
Hi Rui Fan and Junrui,

Thanks for the reminder! I agree to change the
'execution.checkpointing.local-copy' to
'execution.checkpointing.local-copy.enabled'.

And for other suggestions Rui proposed:

1. How about execution.checkpointing.storage.type instead
> of execution.checkpointing.storage?


Ah, I missed something here. Actually I suggest we could merge the current
'state.checkpoints.dir' and 'state.checkpoint-storage' into one URI
configuration named 'execution.checkpointing.dir'. WDYT?

3. execution.checkpointing.savepoint.dir is a little weird.
>

Yes, I think it is better to make 'savepoint' and 'checkpoint' the same
level. But I'm not so sure since there is only one savepoint-related
option. Maybe someone else could share some thoughts here.

4. How about execution.recovery.claim-mode instead of
> execution.recovery.mode?
>

 Agreed. That's more accurate.


Many thanks for your suggestions!

Best,
Zakelly

On Mon, Dec 25, 2023 at 8:18 PM Junrui Lee  wrote:

> Hi Zakelly,
>
> Thanks for driving this. I agree that the proposed restructuring of the
> configuration options is largely positive. It will make understanding and
> working with Flink configurations more intuitive.
>
> Most of the proposed changes look great. Just a heads-up, as Rui Fan
> mentioned, Flink currently requires that no configOption's key be the
> prefix of another to avoid issues when we eventually adopt a standard YAML
> parser, as detailed in FLINK-29372 (
> https://issues.apache.org/jira/browse/FLINK-29372). Therefore, it's better
> to change the key 'execution.checkpointing.local-copy' because it serves as
> a prefix to the key 'execution.checkpointing.local-copy.dir'.
>
> Best regards,
> Junrui
>
> Rui Fan <1996fan...@gmail.com> 于2023年12月25日周一 19:11写道:
>
> > Hi Zakelly,
> >
> > Thank you for driving this proposal!
> >
> > Overall good for me. I have some questions about these names.
> >
> > 1. How about execution.checkpointing.storage.type instead of
> > execution.checkpointing.storage?
> >
> > It's similar to state.backend.type.
> >
> > 2. How about execution.checkpointing.local-copy.enabled instead of
> > execution.checkpointing.local-copy?
> >
> > You added a new option: execution.checkpointing.local-copy.dir.
> > IIUC, one option name shouldn't be the prefix of other options.
> > If you add a new option execution.checkpointing.local-copy,
> > flink CI will fail directly.
> >
> > 3. execution.checkpointing.savepoint.dir is a little weird.
> >
> > For old options: state.savepoints.dir and state.checkpoints.dir,
> > the savepoint and checkpoint are the same level. It means
> > it's a checkpoint or savepoint.
> >
> > The new option execution.checkpointing.dir is fine for me.
> > However, execution.checkpointing.savepoint.dir is a little weird.
> > I don't know which name is better now. Let us think about it more.
> >
> > 4. How about execution.recovery.claim-mode instead of
> > execution.recovery.mode?
> >
> > The meaning of mode is too broad. The claim-mode may
> > be more accurate for users.
> >
> > WDYT?
> >
> > Best,
> > Rui
> >
> > On Mon, Dec 25, 2023 at 5:14 PM Zakelly Lan 
> wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion on FLIP-406: Reorganize State &
> > > Checkpointing & Recovery Configuration[1].
> > >
> > > Currently, the configuration options pertaining to checkpointing,
> > recovery,
> > > and state management are primarily grouped under the following
> prefixes:
> > >
> > >- state.backend.* : configurations related to state accessing and
> > >checkpointing, as well as specific options for individual state
> > backends
> > >- execution.checkpointing.* : configurations associated with
> > checkpoint
> > >execution and recovery
> > >- execution.savepoint.*: configurations for recovery from savepoint
> > >
> > > In addition, there are several individual options such as '
> > > *state.checkpoint-storage*' and '*state.checkpoints.dir*' that fall
> > outside
> > > of these prefixes. The current arrangement of these options, which span
> > > multiple modules, is somewhat haphazard and lacks a systematic
> structure.
> > > For example, the options under the '*CheckpointingOptions*' and '
> > > *ExecutionCheckpointingOptions*' are related and have no clear
> boundaries
> > > from the user's perspective, but there is no unified prefix for them.
> > With
> > > the upcoming release of Flink 2.0, we have an excellent opportunity to
> > > overhaul and restructure the configurations related to checkpointing,
> > > recovery, and state management. This FLIP proposes to reorganize these
> > > settings, making it more coherent by module, which would significantly
> > > lower the barriers for understanding and reduce the development costs
> > > moving forward.
> > >
> > > Looking forward to hearing from you!
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560
> > >
> > > Best,
> > > Zakelly
> > >
> >
>


Re: [VOTE] FLIP-364: Update the default value of backoff-multiplier from 1.2 to 1.5

2023-12-25 Thread Rui Fan
+1(binding)

Closing this vote thread, results will be announced in a separate email.

Best,
Rui

On Thu, Dec 21, 2023 at 4:46 PM Zakelly Lan  wrote:

> This makes sense. +1 (non-binding)
>
>
> Best,
> Zakelly
>
> On Thu, Dec 21, 2023 at 1:50 PM Jiabao Sun  .invalid>
> wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Jiabao
> >
> >
> > > 2023年12月21日 13:25,weijie guo  写道:
> > >
> > > +1(binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Xin Jiang  于2023年12月21日周四 11:21写道:
> > >
> > >> 1.5 multiplier is indeed more reasonable.
> > >>
> > >> +1 (non-binding)
> > >>
> > >>> 2023年12月19日 下午5:17,Rui Fan <1996fan...@gmail.com> 写道:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> Thank you to everyone for the feedback on FLIP-364:
> > >>> Improve the restart-strategy[1] which has been voted in this
> thread[2].
> > >>>
> > >>> After the vote on FLIP-364, there was some feedback on the user mail
> > >>> list[3]
> > >>> suggesting changing the default value of backoff-multiplier from 1.2
> to
> > >> 1.5.
> > >>>
> > >>> I would like to start a vote for it. The vote will be open for at
> least
> > >> 72
> > >>> hours
> > >>> unless there is an objection or not enough votes.
> > >>>
> > >>> [1] https://cwiki.apache.org/confluence/x/uJqzDw
> > >>> [2] https://lists.apache.org/thread/xo03tzw6d02w1vbcj5y9ccpqyc7bqrh9
> > >>> [3] https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw
> > >>>
> > >>> Best,
> > >>> Rui
> > >>
> > >>
> >
> >
>


[RESULT][VOTE] FLIP-364: Update the default value of backoff-multiplier from 1.2 to 1.5

2023-12-25 Thread Rui Fan
Dear developers,

FLIP-364 Improve the exponential-delay restart-strategy[1] has been
accepted[2] before. This vote is updating the default value of
backoff-multiplier from 1.2 to 1.5 and voted through this thread [3].

The vote received 6 approving votes, 3 of which are binding, and
there is no disapproval.

- Zhu Zhu (binding)
- Xin Jiang
- Weijie Guo (binding)
- Jiabao Sun
- Zakelly Lan
- Rui Fan (binding)

Thanks to all participants for the discussion, voting, and providing
valuable feedback.

[1] https://cwiki.apache.org/confluence/x/uJqzDw
[2] https://lists.apache.org/thread/xo03tzw6d02w1vbcj5y9ccpqyc7bqrh9
[3] https://lists.apache.org/thread/0b1dcwb49owpm6v1j8rhrg9h0fvs5nkt

Best,
Rui


[jira] [Created] (FLINK-33936) Mini-batch should output the result when the result is same as last if TTL is setted.

2023-12-25 Thread Feng Jin (Jira)
Feng Jin created FLINK-33936:


 Summary: Mini-batch should output the result when the result is 
same as last if TTL is setted.
 Key: FLINK-33936
 URL: https://issues.apache.org/jira/browse/FLINK-33936
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Feng Jin


If mini-batch is enabled currently, and if the aggregated result is the same as 
the previous output, this time's aggregation result will not be sent 
downstream. The specific logic is as follows. This will cause downstream nodes 
to not receive updated data. If there is a TTL set for states at this time, the 
TTL of downstream will not be updated either.

https://github.com/hackergin/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224

{code:java}
if (!equaliser.equals(prevAggValue, newAggValue)) {
// new row is not same with prev row
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
out.collect(resultRow);
}
// new row is same with prev row, no need to output
{code}



When mini-batch is not enabled, even if the aggregation result of this time is 
the same as last time, new results will still be sent if TTL is set.

https://github.com/hackergin/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170

{code:java}

if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
newAggValue)) {
// newRow is the same as before and state cleaning is not 
enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages 
to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
{code}


Therefore, based on the consideration of TTL scenarios, I believe that when 
mini-batch aggregation is enabled, new results should also be issued when the 
aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33937) Translate "Stateful Stream Processing" page into Chinese

2023-12-25 Thread Yongping Li (Jira)
Yongping Li created FLINK-33937:
---

 Summary: Translate "Stateful Stream Processing" page into Chinese
 Key: FLINK-33937
 URL: https://issues.apache.org/jira/browse/FLINK-33937
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation
Affects Versions: 1.8.0
Reporter: Yongping Li
 Fix For: 1.8.4
 Attachments: image-2023-12-26-08-54-14-041.png

The page is located at 
_"docs/content.zh/docs/concepts/stateful-stream-processing.md"_

_!image-2023-12-26-08-54-14-041.png!_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33938) Update Web UI to adopt typescript 5.x

2023-12-25 Thread Ao Yuchen (Jira)
Ao Yuchen created FLINK-33938:
-

 Summary: Update Web UI to adopt typescript 5.x 
 Key: FLINK-33938
 URL: https://issues.apache.org/jira/browse/FLINK-33938
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
Reporter: Ao Yuchen
 Fix For: 1.19.0, 1.17.3, 1.18.2


Since TypeScript 5.x, implicit coercions in relations operators are forbidden 
([https://devblogs.microsoft.com/typescript/announcing-typescript-5-0/#forbidden-implicit-coercions-in-relational-operators]).

So that the following code in 
flink-runtime-web/web-dashboard/src/app/components/humanize-date.pipe.ts get 
error:
{code:java}
public transform(
  value: number | string | Date,
  ...
): string | null | undefined {
  if (value == null || value === '' || value !== value || value < 0) {
return '-';
  } 
  ...
}{code}
The correctness improvement is availble in 
[https://github.com/microsoft/TypeScript/pull/52048.]

I think we should optimize this type of code for better compatibility.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI

2023-12-25 Thread Jane Chan
Thanks Shengkai and Xuyang.

@Shengkai

I have one question: Is the influence only limited to the RowType? Does the
> Map or Array type have the same problems?
>

I think the issue is exclusive to RowType. You may want to review
CALCITE-2464[1] for more details.

[1] https://issues.apache.org/jira/browse/CALCITE-2464

@Xuyang

Is it possible to consider introducing a deprecated option to allow users
> to fall back to the previous version (default fallback), and then
> officially deprecate it in Flink 2.0?
>

If I understand correctly, 2.0 allows breaking changes to remove historical
baggage in this release. Therefore, if we want to fix this issue before
2.0, we could introduce a fallback option in the two most recent versions
(1.19 and 1.20). However, from version 2.0 onwards, since we no longer
promise backward compatibility, introducing a fallback option might be
unnecessary. What do you think?

BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle
> the nullable attribute of the Row type in the way Calcite expected.
> However, fixing them will also cause a relatively large impact. We may also
> need to check the code part in SQL.
>

Yes, this is another issue caused by the row type nullability handling.
I've mentioned this JIRA ticket in the reference link to the previous
reply.

Best,
Jane

On Mon, Dec 25, 2023 at 1:42 PM Xuyang  wrote:

> Hi, Jane, thanks for driving this.
>
>
> IMO, it is important to keep same consistent semantics between table api
> and sql, not only for maintenance, but also for user experience. But for
> users, the impact of this modification is a bit large. Is it possible to
> consider introducing a deprecated option to allow users to fall back to the
> previous version (default fallback), and then officially deprecate it in
> Flink 2.0?
>
>
> BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle
> the nullable attribute of the Row type in the way Calcite expected.
> However, fixing them will also cause a relatively large impact. We may also
> need to check the code part in SQL.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-33217
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2023-12-25 10:16:28,"Shengkai Fang"  写道:
> >Thanks for Jane and Sergey's proposal!
> >
> >+1 to correct the Table API behavior.
> >
> >I have one question: Is the influence only limited to the RowType? Does
> the
> >Map or Array type have the same problems?
> >
> >Best,
> >Shengkai
> >[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL
> >and TableA
> >
> >Jane Chan  于2023年12月22日周五 17:40写道:
> >
> >> Dear devs,
> >>
> >> Several issues [1][2][3] have been identified regarding the inconsistent
> >> treatment of ROW type nullability between SQL and TableAPI. However,
> >> addressing these discrepancies might necessitate updates to the public
> API.
> >> Therefore, I'm initiating this discussion to engage the community in
> >> forging a unified approach to resolve these challenges.
> >>
> >> To summarize, SQL prohibits ROW types such as ROW >> STRING>, which is implicitly rewritten to ROW by
> >> Calcite[4]. In contrast, TableAPI permits such types, resulting in
> >> inconsistency.
> >> [image: image.png]
> >> For a comprehensive issue breakdown, please refer to the comment of [1].
> >>
> >> According to CALCITE-2464[4], ROW is not a valid type.
> As
> >> a result, the behavior of TableAPI is incorrect and needs to be
> consistent
> >> with SQL, which means the implantation for the following public API
> needs
> >> to be changed.
> >>
> >>- RowType#copy(boolean nullable) should also set the inner fields to
> >>null if nullable is true.
> >>- RowType's constructor should also check nullability.
> >>- FieldsDataType#nullable() should also set the inner fields to null.
> >>
> >> In addition to the necessary changes in the implementation of the
> >> aforementioned API, the semantics of the following API have also been
> >> impacted.
> >>
> >>- `DataTypes.ROW(DataTypes.FIELD("f0",
> >>DataTypes.INT().notNull())).notNull()` cannot create a type like
> `ROW >>NOT NULL>NOT NULL`.
> >>- Idempotence for chained calls `notNull().nullable().notNull()` for
> >>`Row` cannot be maintained.
> >>
> >> Sergey and I have engaged in a discussion regarding the solution [1].
> I'm
> >> interested in gathering additional perspectives on the fix.
> >>
> >> Look forward to your ideas!
> >>
> >> Best,
> >> Jane
> >>
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-31830
> >> [2] https://issues.apache.org/jira/browse/FLINK-31829
> >> [3] https://issues.apache.org/jira/browse/FLINK-33217
> >> [4] https://issues.apache.org/jira/browse/CALCITE-2464
> >>
>


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-25 Thread Yanfei Lei
Hi Zakelly,

Thank you for creating the FLIP and starting the discussion.

The current arrangement of these options is indeed somewhat haphazard,
and the new arrangement looks much better. I have some questions about
the arrangement of some new configuration options:

1. For some state backends that do not support incremental checkpoint,
how does the execution.checkpointing.incrementaloption take effect? Or
is it better to put incremental under state.backend.xxx.incremental?

2. I'm a little worried that putting all configurations into
`ExecutionCheckpointingOptions` will introduce some dependency
problems. Some options would be used by flink-runtime module, but
flink-runtime should not depend on flink-streaming-java. e.g.
FLINK-28286[1].
So, I prefer to move configurations to `CheckpointingOptions`, WDYT?

[1] https://issues.apache.org/jira/browse/FLINK-28286

--
Best,
Yanfei

Zakelly Lan  于2023年12月25日周一 21:14写道:
>
> Hi Rui Fan and Junrui,
>
> Thanks for the reminder! I agree to change the
> 'execution.checkpointing.local-copy' to
> 'execution.checkpointing.local-copy.enabled'.
>
> And for other suggestions Rui proposed:
>
> 1. How about execution.checkpointing.storage.type instead
> > of execution.checkpointing.storage?
>
>
> Ah, I missed something here. Actually I suggest we could merge the current
> 'state.checkpoints.dir' and 'state.checkpoint-storage' into one URI
> configuration named 'execution.checkpointing.dir'. WDYT?
>
> 3. execution.checkpointing.savepoint.dir is a little weird.
> >
>
> Yes, I think it is better to make 'savepoint' and 'checkpoint' the same
> level. But I'm not so sure since there is only one savepoint-related
> option. Maybe someone else could share some thoughts here.
>
> 4. How about execution.recovery.claim-mode instead of
> > execution.recovery.mode?
> >
>
>  Agreed. That's more accurate.
>
>
> Many thanks for your suggestions!
>
> Best,
> Zakelly
>
> On Mon, Dec 25, 2023 at 8:18 PM Junrui Lee  wrote:
>
> > Hi Zakelly,
> >
> > Thanks for driving this. I agree that the proposed restructuring of the
> > configuration options is largely positive. It will make understanding and
> > working with Flink configurations more intuitive.
> >
> > Most of the proposed changes look great. Just a heads-up, as Rui Fan
> > mentioned, Flink currently requires that no configOption's key be the
> > prefix of another to avoid issues when we eventually adopt a standard YAML
> > parser, as detailed in FLINK-29372 (
> > https://issues.apache.org/jira/browse/FLINK-29372). Therefore, it's better
> > to change the key 'execution.checkpointing.local-copy' because it serves as
> > a prefix to the key 'execution.checkpointing.local-copy.dir'.
> >
> > Best regards,
> > Junrui
> >
> > Rui Fan <1996fan...@gmail.com> 于2023年12月25日周一 19:11写道:
> >
> > > Hi Zakelly,
> > >
> > > Thank you for driving this proposal!
> > >
> > > Overall good for me. I have some questions about these names.
> > >
> > > 1. How about execution.checkpointing.storage.type instead of
> > > execution.checkpointing.storage?
> > >
> > > It's similar to state.backend.type.
> > >
> > > 2. How about execution.checkpointing.local-copy.enabled instead of
> > > execution.checkpointing.local-copy?
> > >
> > > You added a new option: execution.checkpointing.local-copy.dir.
> > > IIUC, one option name shouldn't be the prefix of other options.
> > > If you add a new option execution.checkpointing.local-copy,
> > > flink CI will fail directly.
> > >
> > > 3. execution.checkpointing.savepoint.dir is a little weird.
> > >
> > > For old options: state.savepoints.dir and state.checkpoints.dir,
> > > the savepoint and checkpoint are the same level. It means
> > > it's a checkpoint or savepoint.
> > >
> > > The new option execution.checkpointing.dir is fine for me.
> > > However, execution.checkpointing.savepoint.dir is a little weird.
> > > I don't know which name is better now. Let us think about it more.
> > >
> > > 4. How about execution.recovery.claim-mode instead of
> > > execution.recovery.mode?
> > >
> > > The meaning of mode is too broad. The claim-mode may
> > > be more accurate for users.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, Dec 25, 2023 at 5:14 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion on FLIP-406: Reorganize State &
> > > > Checkpointing & Recovery Configuration[1].
> > > >
> > > > Currently, the configuration options pertaining to checkpointing,
> > > recovery,
> > > > and state management are primarily grouped under the following
> > prefixes:
> > > >
> > > >- state.backend.* : configurations related to state accessing and
> > > >checkpointing, as well as specific options for individual state
> > > backends
> > > >- execution.checkpointing.* : configurations associated with
> > > checkpoint
> > > >execution and recovery
> > > >- execution.savepoint.*: configurations for recovery from savepoint
> > > >
> > > > In addition, there are severa

[jira] [Created] (FLINK-33939) Make husky in runtime-web no longer affect git global hooks

2023-12-25 Thread Jason TANG (Jira)
Jason TANG created FLINK-33939:
--

 Summary: Make husky in runtime-web no longer affect git global 
hooks
 Key: FLINK-33939
 URL: https://issues.apache.org/jira/browse/FLINK-33939
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Jason TANG


Since runtime-web relies on husky to ensure that front-end code changes are 
detected before `git commit`, husky modifies the global git hooks 
(core.hooksPath) so that core.hooksPath won't take effect if it's configured 
globally, I thought it would be a good idea to make the front-end code 
detection a optional command execution, which ensures that the globally 
configured hooks are executed correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33940) Update Update the auto-derivation rule of max parallelism for enlarged upscaling space

2023-12-25 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33940:
-

 Summary: Update Update the auto-derivation rule of max parallelism 
for enlarged upscaling space
 Key: FLINK-33940
 URL: https://issues.apache.org/jira/browse/FLINK-33940
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Zhanghao Chen


*Background*

The choice of the max parallelism of an stateful operator is important as it 
limits the upper bound of the parallelism of the opeartor while it can also add 
extra overhead when being set too large. Currently, the max parallelism of an 
opeartor is either fixed to a value specified by API core / pipeline option or 
auto-derived with the following rules:

`min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)`

*Problem*

Recently, the elasticity of Flink jobs is becoming more and more valued by 
users. The current auto-derived max parallelism was introduced a time time ago 
and only allows the operator parallelism to be roughly doubled, which is not 
desired for elasticity. Setting an max parallelism manually may not be desired 
as well: users may not have the sufficient expertise to select a good 
max-parallelism value.

*Proposal*

Update the auto-derivation rule of max parallelism to derive larger max 
parallelism for better elasticity experience out of the box. A candidate is as 
follows:

`min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
32767)`

Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33941) Use field index instead of field name about window time column

2023-12-25 Thread xuyang (Jira)
xuyang created FLINK-33941:
--

 Summary: Use field index instead of field name about window time 
column
 Key: FLINK-33941
 URL: https://issues.apache.org/jira/browse/FLINK-33941
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: xuyang


In some exec nodes like StreamExecGroupWindowAggregate and some rules like 
BatchPhysicalWindowAggregateRule, planner uses "AggregateUtil#timeFieldIndex" 
to access the actual time field index, instead of using the time field index in 
LogicalWindow#timeAttribute directly. However, it would be more formal to use 
the field index instead of the column field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-408: [Umbrella] Introduce DataStream API V2

2023-12-25 Thread weijie guo
Hi devs,


I'd like to start a discussion about FLIP-408: [Umbrella] Introduce
DataStream API V2 [1].


The DataStream API is one of the two main APIs that Flink provides for
writing data processing programs. As an API that was introduced
practically since day-1 of the project and has been evolved for nearly
a decade, we are observing more and more problems of it. Improvements
on these problems require significant breaking changes, which makes
in-place refactor impractical. Therefore, we propose to introduce a
new set of APIs, the DataStream API V2, to gradually replace the
original DataStream API.


The proposal to introduce a whole set new API is complex and includes
massive changes. We are planning  to break it down into multiple
sub-FLIPs for incremental discussion. This FLIP is only used as an
umbrella, mainly focusing on motivation, goals, and overall planning.
That is to say, more design and implementation details  will be
discussed in other FLIPs.


Given that it's hard to imagine the detailed design of the new API if
we're just talking about this umbrella FLIP, and we probably won't be
able to give an opinion on it. Therefore, I have prepared two
sub-FLIPs [2][3] at the same time, and the discussion of them will be
posted later in separate threads.


Looking forward to hearing from you, thanks!


Best regards,

Weijie



[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction


[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2


[DISCUSS] FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2023-12-25 Thread weijie guo
Hi devs,


I'd like to start a discussion about FLIP-409: DataStream V2 Building
Blocks: DataStream, Partitioning and ProcessFunction [1].


As the first sub-FLIP for DataStream API V2, we'd like to discuss and
try to answer some of the most fundamental questions in stream
processing:

   1. What kinds of data streams do we have?
   2. How to partition data over the streams?
   3. How to define a processing on the data stream?

The answer to these questions involve three core concepts: DataStream,
Partitioning and ProcessFunction. In this FLIP, we will discuss the
definitions and related API primitives of these concepts in detail.


You can find more details in FLIP-409 [1]. This sub-FLIP is at the
heart of the entire DataStream API V2, and its relationship with other
sub-FLIPs can be found in the umbrella FLIP [2].


Looking forward to hearing from you, thanks!


Best regards,

Weijie



[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2


[DISCUSS] FLIP-410: Config, Context and Processing Timer Service of DataStream API V2

2023-12-25 Thread weijie guo
Hi devs,


I'd like to start a discussion about FLIP-410: Config, Context and
Processing Timer Service of DataStream API V2 [1]. This is the second
sub-FLIP of DataStream API V2.


In FLIP-409 [2], we have defined the most basic primitive of
DataStream V2. On this basis, this FLIP will further answer several
important questions closely related to it:

   1.
   How to configure the processing over the datastreams, such as
setting the parallelism.
   2.
   How to get access to the runtime contextual information and
services from inside the process functions.
   3. How to work with processing-time timers.

You can find more details in this FLIP. Its relationship with other
sub-FLIPs can be found in the umbrella FLIP
[3].


Looking forward to hearing from you, thanks!


Best regards,

Weijie


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2