[jira] [Created] (FLINK-33592) The return type of the function is void,not convenient to use

2023-11-17 Thread ZhangTao (Jira)
ZhangTao created FLINK-33592:


 Summary: The return type of the function is void,not convenient to 
use
 Key: FLINK-33592
 URL: https://issues.apache.org/jira/browse/FLINK-33592
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: ZhangTao


{code:java}
@PublicEvolving
public void setRestartStrategy(
RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
config.setRestartStrategy(restartStrategyConfiguration);
} {code}
StreamExecutionEnvironment usually has many parameters that need to be set.The 
return type is void, making it inconvenient to use.

Others set methods 'return this; ' , only this method has a void return type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33591) Cleanup ysage of deprecated TableTestBase#addFunction

2023-11-17 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33591:
---

 Summary: Cleanup ysage of deprecated TableTestBase#addFunction
 Key: FLINK-33591
 URL: https://issues.apache.org/jira/browse/FLINK-33591
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


I would say it is a subtask of cleaning up for 
{{TableEnvironment#registerFunction}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-394: Add Metrics for Connector Agnostic Autoscaling

2023-11-17 Thread Rui Fan
Thanks Mason for your feedback and update!

The sources you listed look good to me, +1 for this proposal!

Best,
Rui

On Sat, Nov 18, 2023 at 3:38 AM Mason Chen  wrote:

> Also, it looks like externalizing the Hive connector is unblocked based on
> the past email thread. https://issues.apache.org/jira/browse/FLINK-30064
> seems to have some progress and perhaps we shouldn't touch it for now.
>
> On Fri, Nov 17, 2023 at 11:00 AM Mason Chen 
> wrote:
>
> > Hi Rui and Max,
> >
> > Thanks for the feedback!
> >
> > If yes, I suggest this FLIP includes registering metric part, otherwise
> >> these metrics still cannot work.
> >
> > Yup, you understood it correctly. I'll add that to the required list of
> > work. Note that I'll include only FLIP-27 sources in the Flink repo:
> > FileSource, HybridSource, NumberSequenceSource, DataGeneratorSource, and
> > HiveSource.
> >
> > I think externalized sources could be implemented outside of this FLIP,
> as
> > other sources would have to wait for a Flink minor release and it
> wouldn't
> > make sense to track something like IcebergSource here since it isn't part
> > of the Flink project. KafkaSource is a special case since there is a lot
> of
> > usage and people use it also to verify Flink releases, so I'm open
> > to tracking that here too.
> >
> > In addition, after some further thought on the `setAssignedSplitsGauge`
> > metric, I think we need to track this in the SourceReaders. The reason is
> > that splits can be "completed" and this is only tracked by readers. There
> > are some poll based sources that only take 1 split and poll for another
> > when completed, but we cannot make that assumption in general (i.e.
> request
> > split when a split is completed). So, this needs to be tracked in the
> > reader.
> >
> > Best,
> > Mason
> >
> > On Fri, Nov 17, 2023 at 2:39 AM Maximilian Michels 
> wrote:
> >
> >> Hi Mason,
> >>
> >> Thank you for the proposal. This is a highly requested feature to make
> >> the source scaling of Flink Autoscaling generic across all sources.
> >> The current implementation handles every source individually, and if
> >> we don't find any backlog metrics, we default to using busy time only.
> >> At this point Kafka is the only supported source. We collect the
> >> backlog size (pending metrics), as well as the number of available
> >> splits / partitions.
> >>
> >> For Kafka, we always read from all splits but I like how for the
> >> generic interface we take note of both assigned and unassigned splits.
> >> This allows for more flexible integration with other sources where we
> >> might have additional splits we read from at a later point in time.
> >>
> >> Considering Rui's point, I agree it makes sense to outline the
> >> integration with existing sources. Other than that, +1 from my side
> >> for the proposal.
> >>
> >> Thanks,
> >> Max
> >>
> >> On Fri, Nov 17, 2023 at 4:06 AM Rui Fan <1996fan...@gmail.com> wrote:
> >> >
> >> > Hi Mason,
> >> >
> >> > Thank you for driving this proposal!
> >> >
> >> > Currently, Autoscaler only supports the maximum source parallelism
> >> > of KafkaSource. Introducing the generic metric to support it is good
> >> > to me, +1 for this proposal.
> >> >
> >> > I have a question:
> >> > You added the metric in the flink repo, and Autoscaler will fetch this
> >> > metric. But I didn't see any connector to register this metric.
> >> Currently,
> >> > only IteratorSourceEnumerator setUnassignedSplitsGauge,
> >> > and KafkaSource didn't register it. IIUC, if we don't do it,
> autoscaler
> >> > still cannot fetch this metric, right?
> >> >
> >> > If yes, I suggest this FLIP includes registering metric part,
> otherwise
> >> > these metrics still cannot work.
> >> >
> >> > Please correct me if I misunderstood anything, thanks~
> >> >
> >> > Best,
> >> > Rui
> >> >
> >> > On Fri, Nov 17, 2023 at 6:53 AM Mason Chen 
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > I would like to start a discussion on FLIP-394: Add Metrics for
> >> Connector
> >> > > Agnostic Autoscaling [1].
> >> > >
> >> > > This FLIP recommends adding two metrics to make autoscaling work for
> >> > > bounded split source implementations like IcebergSource. These
> >> metrics are
> >> > > required by the Flink Kubernetes Operator autoscaler algorithm [2]
> to
> >> > > retrieve information for the backlog and the maximum source
> >> parallelism.
> >> > > The changes would affect the `@PublicEvolving`
> >> `SplitEnumeratorMetricGroup`
> >> > > API of the source connector framework.
> >> > >
> >> > > Best,
> >> > > Mason
> >> > >
> >> > > [1]
> >> > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
> >> > > [2]
> >> > >
> >> > >
> >>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations
> >> > >
> >>
> >
>


Re: Request a release of flink-connector-kafka version 3.1.0 (to consume kafka 3.4.0 with Flink 1.18)

2023-11-17 Thread Mason Chen
Hi all,

Sorry for the late reply. I will follow up with the reviewers on
https://issues.apache.org/jira/browse/FLINK-32197. I think we can
definitely release a new Kafka connector before Flink 1.19 but do note it
is the holiday season, so I can't speak on when the exact date will work
for the community.

Reading the CVE, there are some suggestions to do validation in your
application properties to remediate the CVE that is independent of the
Kafka version upgrade.

Best,
Mason

On Fri, Nov 10, 2023 at 2:12 AM Martijn Visser 
wrote:

> Hi Jean-Marc,
>
> To be fair, the Flink project has a lot of dependencies that have
> false-positives from a Flink pov. We just can't fix all of them.
>
> Let's see what others say on this topic.
>
> Best regards,
>
> Martijn
>
> On Fri, Nov 10, 2023 at 10:56 AM Jean-Marc Paulin  wrote:
> >
> > Hi,
> >
> > I am not exactly thrilled by the False positive statement. This always
> leads to a difficult discussion with customers.
> >
> > Is there a chance of releasing a version of the connector to just add
> support for Kafka 3.4.0, in conjunction with Flink 1.18 ?
> >
> > Kind regards
> >
> > Jean-Marc
> > 
> > From: Martijn Visser 
> > Sent: Thursday, November 9, 2023 13:51
> > To: dev@flink.apache.org ; Mason Chen <
> mas.chen6...@gmail.com>
> > Subject: [EXTERNAL] Re: Request a release of flink-connector-kafka
> version 3.1.0 (to consume kafka 3.4.0 with Flink 1.18)
> >
> > Hi,
> >
> > The CVE is related to the Kafka Connect API and I think of that as a
> > false-positive for the Flink Kafka connector. I would be inclined to
> > preferably get https://issues.apache.org/jira/browse/FLINK-32197  in,
> > and then do a release afterwards. But I would like to understand from
> > Mason if he thinks that's feasible.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, Nov 7, 2023 at 9:45 AM Jean-Marc Paulin  wrote:
> > >
> > > Hi,
> > >
> > > I had a chat on [FLINK-31599] Update kafka version to 3.4.0 by Ge
> · Pull Request #11 · apache/flink-connector-kafka (github.com)<
> https://github.com/apache/flink-connector-kafka/pull/11 > .
> > >
> > > We are consuming Flink 1.18, and the flink-connector-kafka 3.0.1.
> > > Flink 3.2.3 currently in use has the  CVE-2023-25194<
> https://www.mend.io/vulnerability-database/disclosure-policy/?query=CVE-2023-25194
> >  vulnerability addressed in Kafka 3.4.0. We will need to move to Kafka
> 3.4.0 for our customers. I have tried to consume Kafka client 3.4.0 but
> that fails after a while. I tracked that down to a change required in the
> flink-connector-kafka source code. The PR11 above has the required changes,
> and is merge in main, but is not currently released.
> > >
> > > I would really appreciate if you could release a newer version of the
> flink-connector-kafka that would enable us to use Kafka 3.4.0.
> > >
> > > Many thanks
> > >
> > > JM
> > >
> > > [
> https://opengraph.githubassets.com/54669eeddff74373a431b6540c3602aefd5fb25232da040f59d9dbb1254615c6/apache/flink-connector-kafka/pull/11
> ]
> > > [FLINK-31599] Update kafka version to 3.4.0 by Ge · Pull Request
> #11 · apache/flink-connector-kafka<
> https://github.com/apache/flink-connector-kafka/pull/11 >
> > > Apache flink. Contribute to apache/flink-connector-kafka development
> by creating an account on GitHub.
> > > github.com
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: [DISCUSS] FLIP-394: Add Metrics for Connector Agnostic Autoscaling

2023-11-17 Thread Mason Chen
Also, it looks like externalizing the Hive connector is unblocked based on
the past email thread. https://issues.apache.org/jira/browse/FLINK-30064
seems to have some progress and perhaps we shouldn't touch it for now.

On Fri, Nov 17, 2023 at 11:00 AM Mason Chen  wrote:

> Hi Rui and Max,
>
> Thanks for the feedback!
>
> If yes, I suggest this FLIP includes registering metric part, otherwise
>> these metrics still cannot work.
>
> Yup, you understood it correctly. I'll add that to the required list of
> work. Note that I'll include only FLIP-27 sources in the Flink repo:
> FileSource, HybridSource, NumberSequenceSource, DataGeneratorSource, and
> HiveSource.
>
> I think externalized sources could be implemented outside of this FLIP, as
> other sources would have to wait for a Flink minor release and it wouldn't
> make sense to track something like IcebergSource here since it isn't part
> of the Flink project. KafkaSource is a special case since there is a lot of
> usage and people use it also to verify Flink releases, so I'm open
> to tracking that here too.
>
> In addition, after some further thought on the `setAssignedSplitsGauge`
> metric, I think we need to track this in the SourceReaders. The reason is
> that splits can be "completed" and this is only tracked by readers. There
> are some poll based sources that only take 1 split and poll for another
> when completed, but we cannot make that assumption in general (i.e. request
> split when a split is completed). So, this needs to be tracked in the
> reader.
>
> Best,
> Mason
>
> On Fri, Nov 17, 2023 at 2:39 AM Maximilian Michels  wrote:
>
>> Hi Mason,
>>
>> Thank you for the proposal. This is a highly requested feature to make
>> the source scaling of Flink Autoscaling generic across all sources.
>> The current implementation handles every source individually, and if
>> we don't find any backlog metrics, we default to using busy time only.
>> At this point Kafka is the only supported source. We collect the
>> backlog size (pending metrics), as well as the number of available
>> splits / partitions.
>>
>> For Kafka, we always read from all splits but I like how for the
>> generic interface we take note of both assigned and unassigned splits.
>> This allows for more flexible integration with other sources where we
>> might have additional splits we read from at a later point in time.
>>
>> Considering Rui's point, I agree it makes sense to outline the
>> integration with existing sources. Other than that, +1 from my side
>> for the proposal.
>>
>> Thanks,
>> Max
>>
>> On Fri, Nov 17, 2023 at 4:06 AM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > Hi Mason,
>> >
>> > Thank you for driving this proposal!
>> >
>> > Currently, Autoscaler only supports the maximum source parallelism
>> > of KafkaSource. Introducing the generic metric to support it is good
>> > to me, +1 for this proposal.
>> >
>> > I have a question:
>> > You added the metric in the flink repo, and Autoscaler will fetch this
>> > metric. But I didn't see any connector to register this metric.
>> Currently,
>> > only IteratorSourceEnumerator setUnassignedSplitsGauge,
>> > and KafkaSource didn't register it. IIUC, if we don't do it, autoscaler
>> > still cannot fetch this metric, right?
>> >
>> > If yes, I suggest this FLIP includes registering metric part, otherwise
>> > these metrics still cannot work.
>> >
>> > Please correct me if I misunderstood anything, thanks~
>> >
>> > Best,
>> > Rui
>> >
>> > On Fri, Nov 17, 2023 at 6:53 AM Mason Chen 
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > > I would like to start a discussion on FLIP-394: Add Metrics for
>> Connector
>> > > Agnostic Autoscaling [1].
>> > >
>> > > This FLIP recommends adding two metrics to make autoscaling work for
>> > > bounded split source implementations like IcebergSource. These
>> metrics are
>> > > required by the Flink Kubernetes Operator autoscaler algorithm [2] to
>> > > retrieve information for the backlog and the maximum source
>> parallelism.
>> > > The changes would affect the `@PublicEvolving`
>> `SplitEnumeratorMetricGroup`
>> > > API of the source connector framework.
>> > >
>> > > Best,
>> > > Mason
>> > >
>> > > [1]
>> > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
>> > > [2]
>> > >
>> > >
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations
>> > >
>>
>


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread Mason Chen
Hi Rui,

I suppose we could do some benchmarking on what works well for the resource
providers that Flink relies on e.g. Kubernetes. Based on conferences and
blogs, it seems most people are relying on Kubernetes to deploy Flink and
the restart strategy has a large dependency on how well Kubernetes can
scale to requests to redeploy the job.

Best,
Mason

On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
wrote:

> Rui,
>
> I don't have any direct experience with this topic, but given the
> motivation you shared, the proposal makes sense to me. Given that the new
> default feels more complex than the current behavior, if we decide to do
> this I think it will be important to include the rationale you've shared in
> the documentation.
>
> David
>
> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>
>> Hi dear flink users and devs:
>>
>> FLIP-364[1] intends to make some improvements to restart-strategy
>> and discuss updating some of the default values of exponential-delay,
>> and whether exponential-delay can be used as the default restart-strategy.
>> After discussing at dev mail list[2], we hope to collect more feedback
>> from Flink users.
>>
>> # Why does the default restart-strategy need to be updated?
>>
>> If checkpointing is enabled, the default value is fixed-delay with
>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>> the job will restart infinitely with high frequency when a job
>> continues to fail.
>>
>> When the Kafka cluster fails, a large number of flink jobs will be
>> restarted frequently. After the kafka cluster is recovered, a large
>> number of high-frequency restarts of flink jobs may cause the
>> kafka cluster to avalanche again.
>>
>> Considering the exponential-delay as the default strategy with
>> a couple of reasons:
>>
>> - The exponential-delay can reduce the restart frequency when
>>   a job continues to fail.
>> - It can restart a job quickly when a job fails occasionally.
>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>   estarting multiple jobs at the same time. It’s useful to prevent
>>   avalanches.
>>
>> # What are the current default values[4] of exponential-delay?
>>
>> restart-strategy.exponential-delay.initial-backoff : 1s
>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>> restart-strategy.exponential-delay.jitter-factor : 0.1
>> restart-strategy.exponential-delay.max-backoff : 5 min
>> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>>
>> backoff-multiplier=2 means that the delay time of each restart
>> will be doubled. The delay times are:
>> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>>
>> The delay time is increased rapidly, it will affect the recover
>> time for flink jobs.
>>
>> # Option improvements
>>
>> We think the backoff-multiplier between 1 and 2 is more sensible,
>> such as:
>>
>> restart-strategy.exponential-delay.backoff-multiplier : 1.2
>> restart-strategy.exponential-delay.max-backoff : 1 min
>>
>> After updating, the delay times are:
>>
>> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
>> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
>> 22.186s, 26.623s, 31.948s, 38.337s, etc
>>
>> They achieve the following goals:
>> - When restarts are infrequent in a short period of time, flink can
>>   quickly restart the job. (For example: the retry delay time when
>>   restarting 5 times is 2.073s)
>> - When restarting frequently in a short period of time, flink can
>>   slightly reduce the restart frequency to prevent avalanches.
>>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>>   and the retry delay time when retrying 20 times is 38s, which is not
>> very
>> large.)
>>
>> As @Mingliang Liu   mentioned at dev mail list: the
>> one-size-fits-all
>> default values do not exist. So our goal is that the default values
>> can be suitable for most jobs.
>>
>> Looking forward to your thoughts and feedback, thanks~
>>
>> [1] https://cwiki.apache.org/confluence/x/uJqzDw
>> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
>> [3]
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
>> [4]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>>
>> Best,
>> Rui
>>
>


[jira] [Created] (FLINK-33590) CheckpointStatsTracker.totalNumberOfSubTasks not updated

2023-11-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-33590:
-

 Summary: CheckpointStatsTracker.totalNumberOfSubTasks not updated
 Key: FLINK-33590
 URL: https://issues.apache.org/jira/browse/FLINK-33590
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.18.1


On rescaling, the DoP is obtained from the JobGraph. 
However, JobGraph vertices are not updated once created. This results in 
missing traces on rescaling (isComplete returns false).

Instead, it should be obtained from DoP store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-394: Add Metrics for Connector Agnostic Autoscaling

2023-11-17 Thread Mason Chen
Hi Rui and Max,

Thanks for the feedback!

If yes, I suggest this FLIP includes registering metric part, otherwise
> these metrics still cannot work.

Yup, you understood it correctly. I'll add that to the required list of
work. Note that I'll include only FLIP-27 sources in the Flink repo:
FileSource, HybridSource, NumberSequenceSource, DataGeneratorSource, and
HiveSource.

I think externalized sources could be implemented outside of this FLIP, as
other sources would have to wait for a Flink minor release and it wouldn't
make sense to track something like IcebergSource here since it isn't part
of the Flink project. KafkaSource is a special case since there is a lot of
usage and people use it also to verify Flink releases, so I'm open
to tracking that here too.

In addition, after some further thought on the `setAssignedSplitsGauge`
metric, I think we need to track this in the SourceReaders. The reason is
that splits can be "completed" and this is only tracked by readers. There
are some poll based sources that only take 1 split and poll for another
when completed, but we cannot make that assumption in general (i.e. request
split when a split is completed). So, this needs to be tracked in the
reader.

Best,
Mason

On Fri, Nov 17, 2023 at 2:39 AM Maximilian Michels  wrote:

> Hi Mason,
>
> Thank you for the proposal. This is a highly requested feature to make
> the source scaling of Flink Autoscaling generic across all sources.
> The current implementation handles every source individually, and if
> we don't find any backlog metrics, we default to using busy time only.
> At this point Kafka is the only supported source. We collect the
> backlog size (pending metrics), as well as the number of available
> splits / partitions.
>
> For Kafka, we always read from all splits but I like how for the
> generic interface we take note of both assigned and unassigned splits.
> This allows for more flexible integration with other sources where we
> might have additional splits we read from at a later point in time.
>
> Considering Rui's point, I agree it makes sense to outline the
> integration with existing sources. Other than that, +1 from my side
> for the proposal.
>
> Thanks,
> Max
>
> On Fri, Nov 17, 2023 at 4:06 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Mason,
> >
> > Thank you for driving this proposal!
> >
> > Currently, Autoscaler only supports the maximum source parallelism
> > of KafkaSource. Introducing the generic metric to support it is good
> > to me, +1 for this proposal.
> >
> > I have a question:
> > You added the metric in the flink repo, and Autoscaler will fetch this
> > metric. But I didn't see any connector to register this metric.
> Currently,
> > only IteratorSourceEnumerator setUnassignedSplitsGauge,
> > and KafkaSource didn't register it. IIUC, if we don't do it, autoscaler
> > still cannot fetch this metric, right?
> >
> > If yes, I suggest this FLIP includes registering metric part, otherwise
> > these metrics still cannot work.
> >
> > Please correct me if I misunderstood anything, thanks~
> >
> > Best,
> > Rui
> >
> > On Fri, Nov 17, 2023 at 6:53 AM Mason Chen 
> wrote:
> >
> > > Hi all,
> > >
> > > I would like to start a discussion on FLIP-394: Add Metrics for
> Connector
> > > Agnostic Autoscaling [1].
> > >
> > > This FLIP recommends adding two metrics to make autoscaling work for
> > > bounded split source implementations like IcebergSource. These metrics
> are
> > > required by the Flink Kubernetes Operator autoscaler algorithm [2] to
> > > retrieve information for the backlog and the maximum source
> parallelism.
> > > The changes would affect the `@PublicEvolving`
> `SplitEnumeratorMetricGroup`
> > > API of the source connector framework.
> > >
> > > Best,
> > > Mason
> > >
> > > [1]
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
> > > [2]
> > >
> > >
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations
> > >
>


[jira] [Created] (FLINK-33589) Fix broken documentation page for Kinesis connector

2023-11-17 Thread Aleksandr Pilipenko (Jira)
Aleksandr Pilipenko created FLINK-33589:
---

 Summary: Fix broken documentation page for Kinesis connector
 Key: FLINK-33589
 URL: https://issues.apache.org/jira/browse/FLINK-33589
 Project: Flink
  Issue Type: Bug
  Components: Connectors / AWS, Documentation
Affects Versions: 1.17.1, 1.18.0, 1.16.2, aws-connector-4.1.0
Reporter: Aleksandr Pilipenko


Recent change in connector version declaration in docs (FLINK-33567) resulted 
in broken documentation page for Kinesis connector for Flink 1.16, 1.17, and 
1.18



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread David Anderson
Rui,

I don't have any direct experience with this topic, but given the
motivation you shared, the proposal makes sense to me. Given that the new
default feels more complex than the current behavior, if we decide to do
this I think it will be important to include the rationale you've shared in
the documentation.

David

On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi dear flink users and devs:
>
> FLIP-364[1] intends to make some improvements to restart-strategy
> and discuss updating some of the default values of exponential-delay,
> and whether exponential-delay can be used as the default restart-strategy.
> After discussing at dev mail list[2], we hope to collect more feedback
> from Flink users.
>
> # Why does the default restart-strategy need to be updated?
>
> If checkpointing is enabled, the default value is fixed-delay with
> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
> the job will restart infinitely with high frequency when a job
> continues to fail.
>
> When the Kafka cluster fails, a large number of flink jobs will be
> restarted frequently. After the kafka cluster is recovered, a large
> number of high-frequency restarts of flink jobs may cause the
> kafka cluster to avalanche again.
>
> Considering the exponential-delay as the default strategy with
> a couple of reasons:
>
> - The exponential-delay can reduce the restart frequency when
>   a job continues to fail.
> - It can restart a job quickly when a job fails occasionally.
> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>   estarting multiple jobs at the same time. It’s useful to prevent
>   avalanches.
>
> # What are the current default values[4] of exponential-delay?
>
> restart-strategy.exponential-delay.initial-backoff : 1s
> restart-strategy.exponential-delay.backoff-multiplier : 2.0
> restart-strategy.exponential-delay.jitter-factor : 0.1
> restart-strategy.exponential-delay.max-backoff : 5 min
> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>
> backoff-multiplier=2 means that the delay time of each restart
> will be doubled. The delay times are:
> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>
> The delay time is increased rapidly, it will affect the recover
> time for flink jobs.
>
> # Option improvements
>
> We think the backoff-multiplier between 1 and 2 is more sensible,
> such as:
>
> restart-strategy.exponential-delay.backoff-multiplier : 1.2
> restart-strategy.exponential-delay.max-backoff : 1 min
>
> After updating, the delay times are:
>
> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
> 22.186s, 26.623s, 31.948s, 38.337s, etc
>
> They achieve the following goals:
> - When restarts are infrequent in a short period of time, flink can
>   quickly restart the job. (For example: the retry delay time when
>   restarting 5 times is 2.073s)
> - When restarting frequently in a short period of time, flink can
>   slightly reduce the restart frequency to prevent avalanches.
>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>   and the retry delay time when retrying 20 times is 38s, which is not very
> large.)
>
> As @Mingliang Liu   mentioned at dev mail list: the
> one-size-fits-all
> default values do not exist. So our goal is that the default values
> can be suitable for most jobs.
>
> Looking forward to your thoughts and feedback, thanks~
>
> [1] https://cwiki.apache.org/confluence/x/uJqzDw
> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
> [4]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>
> Best,
> Rui
>


[jira] [Created] (FLINK-33588) Fix Flink Checkpointing Statistics Bug

2023-11-17 Thread Tongtong Zhu (Jira)
Tongtong Zhu created FLINK-33588:


 Summary: Fix Flink Checkpointing Statistics Bug
 Key: FLINK-33588
 URL: https://issues.apache.org/jira/browse/FLINK-33588
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.17.1, 1.18.0, 1.14.6, 1.15.2, 1.17.0, 1.16.0, 1.14.5
Reporter: Tongtong Zhu
 Fix For: 1.19.0, 1.18.1


When the Flink task is first started, the checkpoint data is null due to the 
lack of data, and Percentile throws a null pointer exception when calculating 
the percentage. After multiple tests, I found that it is necessary to set an 
initial value for the statistical data value of the checkpoint when the 
checkpoint data is null (i.e. at the beginning of the task) to solve this 
problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33587) Tidy up docs around JDBC

2023-11-17 Thread Robin Moffatt (Jira)
Robin Moffatt created FLINK-33587:
-

 Summary: Tidy up docs around JDBC
 Key: FLINK-33587
 URL: https://issues.apache.org/jira/browse/FLINK-33587
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Robin Moffatt


The documentation around using JDBC with Flink is, IMHO, a bit disjointed and 
could be easier to follow. Specifically on 

* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/jdbcdriver/
  * Add note to this page regarding use of this vs Hive JDBC option through 
HiveServer2 endpoint
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/
 should simply redirect to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/hiveserver2/
 since this is a feature of the SQL Gateway. Having two pages of the same title 
is confusing. 

There are also various nits in the above pages, such as missing capitalisation, 
links to github repos that would be useful to add, etc. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:[DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-17 Thread Wencong Liu
Hello Junrui,


Thanks for the effort. I agree with the proposal to deprecate the 
getExecutionConfig() method in the RuntimeContext class. Exposing
the complex ExecutionConfig to user-defined functions can lead to 
unnecessary complexity and risks.


I also have a suggestion. We could consider reviewing the existing
 methods in ExecutionConfig. If there are methods that are defined
 in ExecutionConfig but currently have no callers, we could consider
 annotating  them as @Internal or directly removing them. Since 
users are no longer able to access and invoke these methods, 
it would be beneficial to clean up the codebase.


+1 (non-binding).


Best,
Wencong



















At 2023-11-15 16:51:15, "Junrui Lee"  wrote:
>Hi all,
>
>I'd like to start a discussion of FLIP-391: Deprecate
>RuntimeContext#getExecutionConfig[1].
>
>Currently, the FLINK RuntimeContext is important for connecting user
>functions to the underlying runtime details. It provides users with
>necessary runtime information during job execution.
>However, he current implementation of the FLINK RuntimeContext exposes the
>ExecutionConfig to users, resulting in two issues:
>Firstly, the ExecutionConfig contains much unrelated information that can
>confuse users and complicate management.
>Secondly, exposing the ExecutionConfig allows users to modify it during job
>execution, which can cause inconsistencies and problems, especially with
>operator chaining.
>
>Therefore, we propose deprecating the RuntimeContext#getExecutionConfig in
>the FLINK RuntimeContext. In the upcoming FLINK-2.0 version, we plan to
>completely remove the RuntimeContext#getExecutionConfig method. Instead, we
>will introduce alternative getter methods that enable users to access
>specific information without exposing unnecessary runtime details. These
>getter methods will include:
>
>1. @PublicEvolving  TypeSerializer
>createSerializer(TypeInformation typeInformation);
>2. @PublicEvolving Map getGlobalJobParameters();
>3. @PublicEvolving boolean isObjectReuseEnabled();
>
>Looking forward to your feedback and suggestions, thanks.
>
>[1]
>https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465937
>
>Best regards,
>Junrui


[jira] [Created] (FLINK-33586) Add GHA support in release-1.17 branch

2023-11-17 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-33586:
-

 Summary: Add GHA support in release-1.17 branch
 Key: FLINK-33586
 URL: https://issues.apache.org/jira/browse/FLINK-33586
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Apache Flink Kubernetes Operator Release 1.7.0, release candidate #1

2023-11-17 Thread Rui Fan
+1(non-binding)

- Downloaded artifacts from dist
- Verified SHA512 checksums
- Verified GPG signatures
- Build the source with java-11 and java-17
- Verified the license header
- Verified that chart and appVersion matches the target release
- RC repo works as Helm rep(helm repo add flink-operator-repo-1.7.0-rc1
https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
)
- Verified Helm chart can be installed  (helm install
flink-kubernetes-operator
flink-operator-repo-1.7.0-rc1/flink-kubernetes-operator --set
webhook.create=false)
- Submitted the autoscaling demo, the autoscaler works well with rescale
api (kubectl apply -f autoscaling.yaml)
- Download Autoscaler standalone: wget
https://repository.apache.org/content/repositories/orgapacheflink-1672/org/apache/flink/flink-autoscaler-standalone/1.7.0/flink-autoscaler-standalone-1.7.0.jar
- Ran Autoscaler standalone locally, it works well with rescale api

Best,
Rui

On Fri, Nov 17, 2023 at 2:45 AM Mate Czagany  wrote:

> +1 (non-binding)
>
> - Checked signatures, checksums
> - No binaries found in the source release
> - Verified all source files contain the license header
> - All pom files point to the correct version
> - Verified Helm chart version and appVersion
> - Verified Docker image tag
> - Ran flink-autoscaler-standalone JAR downloaded from the maven repository
> - Tested autoscaler upscales correctly on load with Flink 1.18 rescaling
> API
>
> Thanks,
> Mate
>
> Gyula Fóra  ezt írta (időpont: 2023. nov. 15., Sze,
> 16:37):
>
> > Hi Everyone,
> >
> > Please review and vote on the release candidate #1 for the version 1.7.0
> of
> > Apache Flink Kubernetes Operator,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > **Release Overview**
> >
> > As an overview, the release consists of the following:
> > a) Kubernetes Operator canonical source distribution (including the
> > Dockerfile), to be deployed to the release repository at dist.apache.org
> > b) Kubernetes Operator Helm Chart to be deployed to the release
> repository
> > at dist.apache.org
> > c) Maven artifacts to be deployed to the Maven Central Repository
> > d) Docker image to be pushed to dockerhub
> >
> > **Staging Areas to Review**
> >
> > The staging areas containing the above mentioned artifacts are as
> follows,
> > for your review:
> > * All artifacts for a,b) can be found in the corresponding dev repository
> > at dist.apache.org [1]
> > * All artifacts for c) can be found at the Apache Nexus Repository [2]
> > * The docker image for d) is staged on github [3]
> >
> > All artifacts are signed with the key 21F06303B87DAFF1 [4]
> >
> > Other links for your review:
> > * JIRA release notes [5]
> > * source code tag "release-1.7.0-rc1" [6]
> > * PR to update the website Downloads page to
> > include Kubernetes Operator links [7]
> >
> > **Vote Duration**
> >
> > The voting time will run for at least 72 hours.
> > It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
> >
> > **Note on Verification**
> >
> > You can follow the basic verification guide here[8].
> > Note that you don't need to verify everything yourself, but please make
> > note of what you have tested together with your +- vote.
> >
> > Cheers!
> > Gyula Fora
> >
> > [1]
> >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> > [2]
> > https://repository.apache.org/content/repositories/orgapacheflink-1672/
> > [3] ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [5]
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462
> > [6]
> >
> https://github.com/apache/flink-kubernetes-operator/tree/release-1.7.0-rc1
> > [7] https://github.com/apache/flink-web/pull/699
> > [8]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Kubernetes+Operator+Release
> >
>


Re: [DISCUSS] FLIP-393: Make QueryOperations SQL serializable

2023-11-17 Thread xiangyu feng
>After this FLIP is done, FLINK-25015() can utilize this ability to set
> job name for queries.

+1 for this. Currently, when users submit sql jobs through table api, we
can't see the complete SQL string on flink ui. It would be easy for us to
finish this feature if we can get serialized sql from QueryOperation
directly.

So +1 for the overall proposal.

Regards,
Xiangyu

Benchao Li  于2023年11月17日周五 19:07写道:

> That sounds good to me, I'm looking forward to it!
>
> After this FLIP is done, FLINK-25015 can utilize this ability to set
> job name for queries.
>
> Dawid Wysakowicz  于2023年11月16日周四 21:16写道:
> >
> > Yes, the idea is to convert the QueryOperation tree into a
> > proper/compilable query. To be honest I didn't think it could be done
> > differently, sorry if I wasn't clear enough. Yes, it is very much like
> > SqlNode#toSqlString you mentioned. I'll add an example of a single
> > QueryOperation tree to the FLIP.
> >
> > I tried to focus only on the public contracts, not on the implementation
> > details. I mentioned Expressions, because this requires changing
> > semi-public interfaces in BuiltinFunctionDefinitions.
> >
> > Hope this makes it clearer.
> >
> > Regards,
> > Dawid
> >
> > On Thu, 16 Nov 2023 at 12:12, Benchao Li  wrote:
> >
> > > Sorry that I wasn't expressing it clearly.
> > >
> > > Since the FLIP talks about two things: ResolvedExpression and
> > > QueryOperation, and you have illustrated how to serialize
> > > ResolvedExpression into SQL string. I'm wondering how you'll gonna to
> > > convert QueryOperation into SQL string.
> > >
> > > I was thinking that you proposed to convert the QueryOperation tree
> > > into a "complete runnable SQL statement", e.g.
> > >
> > >
> ProjectQueryOperation(x,y)->FilterQueryOperation(z>10)->TableSourceQueryOperation(T),
> > > we'll get "SELECT x, y FROM T WHERE z > 10".
> > > Maybe I misread it, maybe you just meant to convert each
> > > QueryOperation into a row-level SQL string instead the whole tree into
> > > a complete SQL statement.
> > >
> > > The idea of translating whole QueryOperation tree into SQL statement
> > > may come from my experience of Apache Calcite, there is a
> > > SqlImplementor[1] which convert a RelNode tree into SqlNode, and
> > > further we can use  SqlNode#toSqlString to unparse it into SQL string.
> > > I would assume that most of our QueryOperations are much like the
> > > abstraction of Calcite's RelNode, with some exceptions such as
> > > PlannerQueryOperation.
> > >
> > > [1]
> > >
> https://github.com/apache/calcite/blob/153796f8994831ad015af4b9036aa01ebf78/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java#L141
> > >
> > > Dawid Wysakowicz  于2023年11月16日周四 16:24写道:
> > > >
> > > > I think the FLIP covers all public contracts that are necessary to be
> > > > discussed at that level.
> > > >
> > > > If you meant you could not find a method that would be called to
> trigger
> > > > the translation then it is already there. It's just not implemented
> yet:
> > > > QueryOperation#asSerializableString[1]. As I mentioned this is
> mostly a
> > > > follow up to previous work.
> > > >
> > > > Regards,
> > > > Dawid
> > > >
> > > > [1]
> > > >
> > >
> https://github.com/apache/flink/blob/d18a4bfe596fc580f8280750fa3bfa22007671d9/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperation.java#L46
> > > >
> > > > On Wed, 15 Nov 2023 at 16:36, Benchao Li 
> wrote:
> > > >
> > > > > +1 for the idea of choosing SQL as the serialization format for
> > > > > QueryOperation, thanks for Dawid for driving this FLIP.
> > > > >
> > > > > Regarding the implementation, I didn't see the proposal for how to
> > > > > translate QueryOperation to SQL yet, am I missing something? Or the
> > > > > FLIP is still in preliminary state, you just want to gather ideas
> > > > > about whether to use SQL or something else as the serialization
> format
> > > > > for QueryOperation?
> > > > >
> > > > > Dawid Wysakowicz  于2023年11月15日周三 19:34写道:
> > > > > >
> > > > > > Hi,
> > > > > > I would like to propose a follow-up improvement to some of the
> work
> > > that
> > > > > > has been done over the years to the Table API. I posted the
> proposed
> > > > > > changes in the FLIP-393. I'd like to get to know what others
> think of
> > > > > > choosing SQL as the serialization format for QueryOperations.
> > > > > > Regards,
> > > > > > Dawid Wysakowicz
> > > > > >
> > > > > > [1] https://cwiki.apache.org/confluence/x/vQ2ZE
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: [DISCUSS] FLIP-393: Make QueryOperations SQL serializable

2023-11-17 Thread Benchao Li
That sounds good to me, I'm looking forward to it!

After this FLIP is done, FLINK-25015 can utilize this ability to set
job name for queries.

Dawid Wysakowicz  于2023年11月16日周四 21:16写道:
>
> Yes, the idea is to convert the QueryOperation tree into a
> proper/compilable query. To be honest I didn't think it could be done
> differently, sorry if I wasn't clear enough. Yes, it is very much like
> SqlNode#toSqlString you mentioned. I'll add an example of a single
> QueryOperation tree to the FLIP.
>
> I tried to focus only on the public contracts, not on the implementation
> details. I mentioned Expressions, because this requires changing
> semi-public interfaces in BuiltinFunctionDefinitions.
>
> Hope this makes it clearer.
>
> Regards,
> Dawid
>
> On Thu, 16 Nov 2023 at 12:12, Benchao Li  wrote:
>
> > Sorry that I wasn't expressing it clearly.
> >
> > Since the FLIP talks about two things: ResolvedExpression and
> > QueryOperation, and you have illustrated how to serialize
> > ResolvedExpression into SQL string. I'm wondering how you'll gonna to
> > convert QueryOperation into SQL string.
> >
> > I was thinking that you proposed to convert the QueryOperation tree
> > into a "complete runnable SQL statement", e.g.
> >
> > ProjectQueryOperation(x,y)->FilterQueryOperation(z>10)->TableSourceQueryOperation(T),
> > we'll get "SELECT x, y FROM T WHERE z > 10".
> > Maybe I misread it, maybe you just meant to convert each
> > QueryOperation into a row-level SQL string instead the whole tree into
> > a complete SQL statement.
> >
> > The idea of translating whole QueryOperation tree into SQL statement
> > may come from my experience of Apache Calcite, there is a
> > SqlImplementor[1] which convert a RelNode tree into SqlNode, and
> > further we can use  SqlNode#toSqlString to unparse it into SQL string.
> > I would assume that most of our QueryOperations are much like the
> > abstraction of Calcite's RelNode, with some exceptions such as
> > PlannerQueryOperation.
> >
> > [1]
> > https://github.com/apache/calcite/blob/153796f8994831ad015af4b9036aa01ebf78/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java#L141
> >
> > Dawid Wysakowicz  于2023年11月16日周四 16:24写道:
> > >
> > > I think the FLIP covers all public contracts that are necessary to be
> > > discussed at that level.
> > >
> > > If you meant you could not find a method that would be called to trigger
> > > the translation then it is already there. It's just not implemented yet:
> > > QueryOperation#asSerializableString[1]. As I mentioned this is mostly a
> > > follow up to previous work.
> > >
> > > Regards,
> > > Dawid
> > >
> > > [1]
> > >
> > https://github.com/apache/flink/blob/d18a4bfe596fc580f8280750fa3bfa22007671d9/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperation.java#L46
> > >
> > > On Wed, 15 Nov 2023 at 16:36, Benchao Li  wrote:
> > >
> > > > +1 for the idea of choosing SQL as the serialization format for
> > > > QueryOperation, thanks for Dawid for driving this FLIP.
> > > >
> > > > Regarding the implementation, I didn't see the proposal for how to
> > > > translate QueryOperation to SQL yet, am I missing something? Or the
> > > > FLIP is still in preliminary state, you just want to gather ideas
> > > > about whether to use SQL or something else as the serialization format
> > > > for QueryOperation?
> > > >
> > > > Dawid Wysakowicz  于2023年11月15日周三 19:34写道:
> > > > >
> > > > > Hi,
> > > > > I would like to propose a follow-up improvement to some of the work
> > that
> > > > > has been done over the years to the Table API. I posted the proposed
> > > > > changes in the FLIP-393. I'd like to get to know what others think of
> > > > > choosing SQL as the serialization format for QueryOperations.
> > > > > Regards,
> > > > > Dawid Wysakowicz
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/vQ2ZE
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >



-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-394: Add Metrics for Connector Agnostic Autoscaling

2023-11-17 Thread Maximilian Michels
Hi Mason,

Thank you for the proposal. This is a highly requested feature to make
the source scaling of Flink Autoscaling generic across all sources.
The current implementation handles every source individually, and if
we don't find any backlog metrics, we default to using busy time only.
At this point Kafka is the only supported source. We collect the
backlog size (pending metrics), as well as the number of available
splits / partitions.

For Kafka, we always read from all splits but I like how for the
generic interface we take note of both assigned and unassigned splits.
This allows for more flexible integration with other sources where we
might have additional splits we read from at a later point in time.

Considering Rui's point, I agree it makes sense to outline the
integration with existing sources. Other than that, +1 from my side
for the proposal.

Thanks,
Max

On Fri, Nov 17, 2023 at 4:06 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi Mason,
>
> Thank you for driving this proposal!
>
> Currently, Autoscaler only supports the maximum source parallelism
> of KafkaSource. Introducing the generic metric to support it is good
> to me, +1 for this proposal.
>
> I have a question:
> You added the metric in the flink repo, and Autoscaler will fetch this
> metric. But I didn't see any connector to register this metric. Currently,
> only IteratorSourceEnumerator setUnassignedSplitsGauge,
> and KafkaSource didn't register it. IIUC, if we don't do it, autoscaler
> still cannot fetch this metric, right?
>
> If yes, I suggest this FLIP includes registering metric part, otherwise
> these metrics still cannot work.
>
> Please correct me if I misunderstood anything, thanks~
>
> Best,
> Rui
>
> On Fri, Nov 17, 2023 at 6:53 AM Mason Chen  wrote:
>
> > Hi all,
> >
> > I would like to start a discussion on FLIP-394: Add Metrics for Connector
> > Agnostic Autoscaling [1].
> >
> > This FLIP recommends adding two metrics to make autoscaling work for
> > bounded split source implementations like IcebergSource. These metrics are
> > required by the Flink Kubernetes Operator autoscaler algorithm [2] to
> > retrieve information for the backlog and the maximum source parallelism.
> > The changes would affect the `@PublicEvolving` `SplitEnumeratorMetricGroup`
> > API of the source connector framework.
> >
> > Best,
> > Mason
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
> > [2]
> >
> > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations
> >


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-17 Thread Becket Qin
Hi Hongshun,

SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already
an interface, and we need to make that as a PublicEvolving API as well.

So overall, a source developer can potentially do a few things in the
SplitFetcherManager.
1. for customized logic including split-to-fetcher assignment, threading
model, etc.
2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to
execute in a coordinated manner.

It should be powerful enough for the vast majority of the source
implementation, if not all.


Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as a
> constructor parameter, which is not allowed
> now.

Are you referring to FetchTask which implements SplitFetcherTask? That
class will remain internal.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
wrote:

> Hi, Jiangjie(Becket) ,
> Thank you for your advice. I have learned a lot.
>
> If SplitFetcherManager becomes PublicEvolving, that also means
> > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > protected method SplitFetcherManager.createSplitFetcher().
>
> I completely agree with you. However, if SplitFetcher becomes
> PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> because it is returned by the public method SplitFetcher#enqueueTask.
> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as a
> constructor parameter, which is not allowed
> now. Therefore, I propose changing SplitFetcher to a public Interface
> and moving its implementation details to an implement class (e.g.,
> SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> marked as internal and managed by SplitFetcherManager,
> and put data in the queue.
> Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
> also ensuring that the current subclasses are not affected.
>
>
>
> The current SplitFetcherManager basically looks up
> > the SplitT from the fetcher with the split Id, and immediately passes the
> > SplitT back to the fetcher, which is unnecessary.
>
> I inferred that this is because SplitReader#pauseOrResumeSplits
> requires SplitT instead of SpiltId.  Perhaps some external source
> requires more information to pause. However, SplitReader doesn't store
> all its split data, while SplitFetcherManager saves them.
> CC, @Dawid Wysakowicz
>
>
>
>  If not, SplitFetcher.pause() and
> > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > anywhere.
>
> It seems no use any more. CC, @Arvid Heise
>
>
>
> Thanks,
> Hongshun Wang
>
> On Fri, Nov 17, 2023 at 11:42 AM Becket Qin  wrote:
>
> > Hi Hongshun,
> >
> > Thanks for updating the FLIP. I think that makes sense. A few comments
> > below:
> >
> > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > protected method SplitFetcherManager.createSplitFetcher().
> >
> > 2. When checking the API of the classes to be marked as PublicEvolving,
> > there might be a few methods' signatures worth some discussion.
> >
> > For SplitFetcherManager:
> > a) Currently removeSplits() methods takes a list of SplitT. I am
> wondering
> > if it should be a list of splitIds. SplitT actually contains two parts of
> > information, the static split Id and some dynamically changing state of
> the
> > split (e.g. Kafka consumer offset). The source of truth for the dynamic
> > state is SourceReaderBase. Currently we are passing in the full source
> > split with the dynamic state for split removal. But it looks like only
> > split id is needed for the split removal.
> > Maybe this is intentional, as sometimes when a SplitReader removes a
> split,
> > it also wants to know the dynamic state of the split. If so, we can keep
> it
> > as is. But then the question is why
> > SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead
> of
> > SplitT. Should we make them consistent?
> >
> > For SplitFetcher:
> > a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> > SplitT as arguments. We may want to adjust that according to what we do
> to
> > the SplitFetcherManager. The current SplitFetcherManager basically looks
> up
> > the SplitT from the fetcher with the split Id, and immediately passes the
> > SplitT back to the fetcher, which is unnecessary.
> > b) After supporting split level pause and resume, do we still need split
> > fetcher level pause and resume? If not, SplitFetcher.pause() and
> > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > anywhere.
> >
> > Other than the above potential API adjustment before we mark the classes
> > PublicEvolving, the API looks fine to me.
> >
> > I think it is good timing for deprecation now. We will mark the impacted
> > constructors as deprecated in 1.19, and remove them in release of 2.0.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Nov 16, 

[jira] [Created] (FLINK-33585) Upgrade Zookeeper to 3.7.1

2023-11-17 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-33585:
--

 Summary: Upgrade Zookeeper to 3.7.1
 Key: FLINK-33585
 URL: https://issues.apache.org/jira/browse/FLINK-33585
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Reporter: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33584) Update Hadoop Filesystems to 3.3.6

2023-11-17 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-33584:
--

 Summary: Update Hadoop Filesystems to 3.3.6
 Key: FLINK-33584
 URL: https://issues.apache.org/jira/browse/FLINK-33584
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / FileSystem
Reporter: Martijn Visser
Assignee: Martijn Visser


Update the Hadoop filesystems to 3.3.6. Some of the key changes:

{code:java}
* A big update of dependencies to try and keep those reports of transitive CVEs 
under control -both genuine and false positives.
* Critical fix to ABFS input stream prefetching for correct reading.
* Vectored IO API for all FSDataInputStream implementations, with 
high-performance versions for file:// and s3a:// filesystems. file:// through 
java native IO s3a:// parallel GET requests.
* Arm64 binaries. Note, because the arm64 release was on a different platform, 
the jar files may not match those of the x86 release -and therefore the maven 
artifacts.
* Security fixes in Hadoop’s own code.
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-17 Thread Hongshun Wang
Hi, Jiangjie(Becket) ,
Thank you for your advice. I have learned a lot.

If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().

I completely agree with you. However, if SplitFetcher becomes
PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
because it is returned by the public method SplitFetcher#enqueueTask.
Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
as a
constructor parameter, which is not allowed
now. Therefore, I propose changing SplitFetcher to a public Interface
and moving its implementation details to an implement class (e.g.,
SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
marked as internal and managed by SplitFetcherManager,
and put data in the queue.
Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
also ensuring that the current subclasses are not affected.



The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.

I inferred that this is because SplitReader#pauseOrResumeSplits
requires SplitT instead of SpiltId.  Perhaps some external source
requires more information to pause. However, SplitReader doesn't store
all its split data, while SplitFetcherManager saves them.
CC, @Dawid Wysakowicz



 If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.

It seems no use any more. CC, @Arvid Heise



Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 11:42 AM Becket Qin  wrote:

> Hi Hongshun,
>
> Thanks for updating the FLIP. I think that makes sense. A few comments
> below:
>
> 1. If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().
>
> 2. When checking the API of the classes to be marked as PublicEvolving,
> there might be a few methods' signatures worth some discussion.
>
> For SplitFetcherManager:
> a) Currently removeSplits() methods takes a list of SplitT. I am wondering
> if it should be a list of splitIds. SplitT actually contains two parts of
> information, the static split Id and some dynamically changing state of the
> split (e.g. Kafka consumer offset). The source of truth for the dynamic
> state is SourceReaderBase. Currently we are passing in the full source
> split with the dynamic state for split removal. But it looks like only
> split id is needed for the split removal.
> Maybe this is intentional, as sometimes when a SplitReader removes a split,
> it also wants to know the dynamic state of the split. If so, we can keep it
> as is. But then the question is why
> SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
> SplitT. Should we make them consistent?
>
> For SplitFetcher:
> a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> SplitT as arguments. We may want to adjust that according to what we do to
> the SplitFetcherManager. The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.
> b) After supporting split level pause and resume, do we still need split
> fetcher level pause and resume? If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.
>
> Other than the above potential API adjustment before we mark the classes
> PublicEvolving, the API looks fine to me.
>
> I think it is good timing for deprecation now. We will mark the impacted
> constructors as deprecated in 1.19, and remove them in release of 2.0.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang 
> wrote:
>
> > Hi Devs,
> >
> > I have just modified the content of FLIP-389: Annotate
> > SingleThreadFetcherManager as PublicEvolving[1].
> >
> > Now this Flip mainly do two thing:
> >
> >1. Annotate SingleThreadFetcherManager as PublicEvolving
> >2. Remove all public constructors which use
> >FutureCompletingBlockingQueue. This will make many constructors as
> >@Depricated.
> >
> > This may influence many connectors, so I am looking forward to hearing
> from
> > you.
> >
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > > >
> > > >
> > > > However, it will be tricky because SplitFetcherManager includes  > > SplitT
> > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes
> .
> > > > This means that SplitFetcherManager would have to be modified to  E,
> > > > SplitT extends SourceSplit>, which would affect the 

[jira] [Created] (FLINK-33583) Support state ttl hint for join

2023-11-17 Thread xuyang (Jira)
xuyang created FLINK-33583:
--

 Summary: Support state ttl hint for join 
 Key: FLINK-33583
 URL: https://issues.apache.org/jira/browse/FLINK-33583
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.19.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[ANNOUNCE] Apache Flink-shaded 16.2 released

2023-11-17 Thread weijie guo
The Apache Flink community is very happy to announce the release of
Apache Flink-shaded 16.2.



The flink-shaded project contains a number of shaded dependencies for
Apache Flink.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.



The release is available for download at:

https://flink.apache.org/downloads.html



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353810



We would like to thank all contributors of the Apache Flink community
who made this release possible!



Regards,

Release Manager


[jira] [Created] (FLINK-33582) Update flink-shaded version

2023-11-17 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-33582:
-

 Summary: Update flink-shaded version
 Key: FLINK-33582
 URL: https://issues.apache.org/jira/browse/FLINK-33582
 Project: Flink
  Issue Type: Bug
  Components: BuildSystem / Shaded
Affects Versions: 1.17.2
Reporter: Yuxin Tan


This is a follow-up task for https://issues.apache.org/jira/browse/FLINK-33417. 

After flink-shaded 16.2 is released, we should update the flink-shaded version 
for Flink 1.17 to resolve the issue thoroughly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)