[jira] [Created] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-29913:
--

 Summary: Shared state would be discarded by mistake when 
maxConcurrentCheckpoint>1
 Key: FLINK-29913
 URL: https://issues.apache.org/jira/browse/FLINK-29913
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0, 1.15.0
Reporter: Yanfei Lei


When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
backend would be discarded by registering the same name handle. See 
[https://github.com/apache/flink/pull/21050#discussion_r1011061072]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Yanfei Lei
Hi Max,

Thanks for the proposal. This proposal makes Flink better adapted to
cloud-native applications!

After reading the FLIP, I'm curious about some points:

1) It's said that "The first step is collecting metrics for all JobVertices
by combining metrics from all the runtime subtasks and computing the
*average*". When the load of the subtasks of an operator is not balanced,
do we need to trigger autoScaling? Has the median or some percentiles been
considered?
2) IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
will we reuse some logic from Reactive Mode?

Best,
Yanfei

Gyula Fóra  于2022年11月7日周一 02:33写道:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time spent
> in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>
> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>
> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>
> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>
> > Hi Max,
> >
> > Thank you for the proposal. The proposal tackles a very important issue
> > for Flink users and the design looks promising overall!
> >
> > I have some questions to better understand the proposed public interfaces
> > and the algorithm.
> >
> > 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> > could reach 1 sec. I believe this is mostly true for cpu-bound operators.
> > Could you confirm that this can also be true for io-bound operators such
> as
> > sinks? For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > reach 1 sec?
> >
> > 2) It is said that "users can configure a maximum time to fully process
> > the backlog". The configuration section does not seem to provide this
> > config. Could you specify this? And any chance this proposal can provide
> > the formula for calculating the new processing rate?
> >
> > 3) How are users expected to specify the per-operator configs (e.g.
> target
> > utilization)? For example, should users specify it programmatically in a
> > DataStream/Table/SQL API?
> >
> > 4) How often will the Flink Kubernetes operator query metrics from
> > JobManager? Is this configurable?
> >
> > 5) Could you specify the config name and default value for the proposed
> > configs?
> >
> > 6) Could you add the name/mbean/type for the proposed metrics?
> >
> >
> > Cheers,
> > Dong
> >
> >
> >
>


Re: SQL Gateway and SQL Client

2022-11-06 Thread Shengkai Fang
Hi, all. Sorry for the late reply.

> Is the gateway mode planned to be supported for SQL Client in 1.17?
> Do you have anything you can already share so we can start with your work
or just play around with it.

Yes. @yzl is working on it and he will list the implementation plan later
and share the progress. I think the change is not very large and I think
it's not a big problem to finish this in the release-1.17. I will join to
develop this in the mid of November.

Best,
Shengkai




Jamie Grier  于2022年11月5日周六 00:48写道:

> Hi Shengkai,
>
> We're doing more and more Flink development at Confluent these days and
> we're currently trying to bootstrap a prototype that relies on the SQL
> Client and Gateway.  We will be using the the components in some of our
> projects and would like to co-develop them with you and the rest of the
> Flink community.
>
> As of right now it's a pretty big blocker for our upcoming milestone that
> the SQL Client has not yet been modified to talk to the SQL Gateway and we
> want to help with this effort ASAP!  We would be even willing to take over
> the work if it's not yet started but I suspect it already is.
>
> Anyway, rather than start working immediately on the SQL Client and adding
> a the new Gateway mode ourselves we wanted to start a conversation with you
> and see where you're at with things and offer to help.
>
> Do you have anything you can already share so we can start with your work
> or just play around with it.  Like I said, we just want to get started and
> are very able to help in this area.  We see both the SQL Client and Gateway
> being very important for us and have a good team to help develop it.
>
> Let me know if there is a branch you can share, etc.  It would be much
> appreciated!
>
> -Jamie Grier
>
>
> On 2022/10/28 06:06:49 Shengkai Fang wrote:
> > Hi.
> >
> > > Is there a possibility for us to get engaged and at least introduce
> > initial changes to support authentication/authorization?
> >
> > Yes. You can write a FLIP about the design and change. We can discuss
> this
> > in the dev mail. If the FLIP passes, we can develop it together.
> >
> > > Another question about persistent Gateway: did you have any specific
> > thoughts about it or some draft design?
> >
> > We don't have any detailed plan about this. But I know Livy has a similar
> > feature.
> >
> > Best,
> > Shengkai
> >
> >
> > Alexey Leonov-Vendrovskiy  于2022年10月27日周四 15:12写道:
> >
> > > Apologies from the delayed response on my side.
> > >
> > >  I think the authentication module is not part of our plan in 1.17
> because
> > >> of the busy work. I think we'll start the design at the end of the
> > >> release-1.17.
> > >
> > >
> > > Is there a possibility for us to get engaged and at least introduce
> > > initial changes to support authentication/authorization? Specifically,
> > > changes in the API and in SQL Client.
> > >
> > > We expect the following authentication flow:
> > >
> > > On the SQL gateway we want to be able to use a delegation token.
> > > SQL client should be able to supply an API key.
> > > The SQL Gateway *would not *be submitting jobs on behalf of the client.
> > >
> > > Ideally it would be nice to introduce some interfaces in the SQL
> Gateway
> > > that would allow implementing custom authentication and authorization.
> > >
> > > Another question about persistent Gateway: did you have any specific
> > > thoughts about it or some draft design?
> > >
> > > Thanks,
> > > Alexey
> > >
> > >
> > > On Fri, Oct 21, 2022 at 1:13 AM Shengkai Fang 
> wrote:
> > >
> > >> Sorry for the late response.
> > >>
> > >> In the next version(Flink 1.17), we plan to support the SQL Client to
> > >> submit the statement to the Flink SQL Gateway. The FLINK-29486
> > >>  is the first
> step to
> > >> remove the usage of the `Parser` in the client side, which needs to
> read
> > >> the table schema during the converting sql node to operation. I think
> the authentication
> > >> module is not part of our plan in 1.17 because of the busy work. I
> think
> > >> we'll start the design at the end of the release-1.17.
> > >> But could you share more details about the requirements of the
> > >> authentication?
> > >> - Do you use the kerberos or delegation token or password to do the
> > >> authentication?
> > >> - After the authentication, do you need the sql gateway to submit the
> > >> job on behalf of the client?
> > >> - ...
> > >>
> > >> For detailed implementation, I think Hive and Presto are good
> examples to
> > >> dig in.  If you have some thoughts about the authentication module,
> > >> please let me know.
> > >>
> > >> Best,
> > >> Shengkai
> > >>
> > >> Alexey Leonov-Vendrovskiy  于2022年10月19日周三
> 00:37写道:
> > >>
> > >>> Thank you for the response, Yuxia!
> > >>>
> > >>> Shengkai, I would like to learn more about nearest and a bit more
> > >>> distant plans about development of the SQL Gateway and the SQL
> Client.
> > >>> Do you have 

Re: [DISCUSS] Repeatable cleanup of checkpoint data

2022-11-06 Thread Yang Wang
Thanks Matthias for continuously improving the clean-up process.

Given that we highly depends on K8s APIServer for HA implementation, I am
not in favor of storing too many entries in the ConfigMap,
as well as adding more update requests to the APIServer. So I lean towards
Proposal #2. It just works like we revert the current mark-deletion
in StateHandleStore and then introduce a completely new FileSystem based
artifacts clean-up mechanism.

When doing the failover, I suggest the clean-up to be processed
asynchronously. Otherwise, listing the completed checkpoints and deleting
the invalid ones will take too much time and slow down the recovery process.

Best,
Yang

Matthias Pohl  于2022年10月27日周四 20:20写道:

> I would like to bring this topic up one more time. I put some more thought
> into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> updated version of what I summarized in my previous email. It would be
> interesting to get some additional perspectives on this; more specifically,
> the two included proposals about either just repurposing the
> CompletedCheckpointStore into a more generic CheckpointStore or refactoring
> the StateHandleStore interface moving all the cleanup logic from the
> CheckpointsCleaner and StateHandleStore into what's currently called
> CompletedCheckpointStore.
>
> Looking forward to feedback on that proposal.
>
> Best,
> Matthias
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl 
> wrote:
>
> > Hi everyone,
> >
> > I’d like to start a discussion on repeatable cleanup of checkpoint data.
> > In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> > introduction of the JobResultStore component. The goal was to make Flink
> > being in charge of cleanup for the data it owns. The Flink cluster should
> > only shutdown gracefully after all its artifacts are removed. That way,
> one
> > would not miss abandoned artifacts accidentally.
> >
> > We forgot to cover one code path around cleaning up checkpoint data.
> > Currently, in case of an error (e.g. permission issues), checkpoints are
> > tried to be cleaned up in the CheckpointsCleaner and left like that if
> > that cleanup failed. A log message is printed. The user would be
> > responsible for cleaning up the data. This was discussed as part of the
> > release testing efforts for Flink 1.15 in FLINK-26388 [2].
> >
> > We could add repeatable cleanup in the CheckpointsCleaner. We would have
> > to make sure that all StateObject#discardState implementations are
> > idempotent. This is not necessarily the case right now (see FLINK-26606
> > [3]).
> >
> > Additionally, there is the problem of losing information about what
> > Checkpoints are subject to cleanup in case of JobManager failovers. These
> > Checkpoints are not stored as part of the HA data. Additionally,
> > PendingCheckpoints are not serialized in any way, either. None of these
> > artifacts are picked up again after a failover. I see the following
> options
> > here:
> >
> >-
> >
> >The purpose of CompletedCheckpointStore needs to be extended to become
> >a “general” CheckpointStore. It will store PendingCheckpoints and
> >CompletedCheckpoints that are marked for deletion. After a failover,
> >CheckpointsCleaner can pick up these instances again and continue with
> >the deletion process.
> >
> > The flaw of that approach is that we’re increasing the amount of data
> that
> > is stored in the underlying StateHandleStore. Additionally, we’re going
> > to have an increased number of accesses to the CompletedCheckpointStore.
> > These accesses need to happen in the main thread; more specifically,
> adding
> > PendingCheckpoints and marking Checkpoints for deletion.
> >
> >-
> >
> >We’re actually interested in cleaning up artifacts from the
> >FileSystem, i.e. the artifacts created by the StateHandleStore used
> >within the DefaultCompletedCheckpointStore containing the serialized
> >CompletedCheckpoint instance and the checkpoint’s folder containing
> >the actual operator states. We could adapt the
> CompletedCheckpointStore
> >in a way that any Checkpoint (including PendingCheckpoint) is
> >serialized and persisted on the FileSystem right away (which is
> currently
> >done within the StateHandleStore implementations when adding
> >CompletedCheckpoints to the underlying HA system). The corresponding
> >FileStateHandleObject (referring to that serialized
> CompletedCheckpoint)
> >that gets persisted to ZooKeeper/k8s ConfigMap in the end would be
> only
> >written if the CompletedCheckpoint is finalized and can be used. The
> >CheckpointsCleaner could recover any artifacts from the FileSystem and
> >cleanup anything that’s not listed in ZooKeeper/k8s 

[jira] [Created] (FLINK-29912) scan.partition. column can specify any type of field

2022-11-06 Thread waywtdcc (Jira)
waywtdcc created FLINK-29912:


 Summary: scan.partition. column can specify any type of field
 Key: FLINK-29912
 URL: https://issues.apache.org/jira/browse/FLINK-29912
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / JDBC
Affects Versions: 1.16.0
Reporter: waywtdcc
 Fix For: 1.17.0


scan.partition. Column can specify any type of field. At present, scan. 
partition Column must be a numeric, date, or timestamp column from the table in 
question. You can specify any type of field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Gyula Fóra
Hi Dong!

Let me try to answer the questions :)

1 : busyTimeMsPerSecond is not specific for CPU, it measures the time spent
in the main record processing loop for an operator if I
understand correctly. This includes IO operations too.

2: We should add this to the FLIP I agree. It would be a Duration config
with the expected catch up time after rescaling (let's say 5 minutes). It
could be computed based on the current data rate and the calculated max
processing rate after the rescale.

3: In the current proposal we don't have per operator configs. Target
utilization would apply to all operators uniformly.

4: It should be configurable, yes.

5,6: The names haven't been finalized but I think these are minor details.
We could add concrete names to the FLIP :)

Cheers,
Gyula


On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:

> Hi Max,
>
> Thank you for the proposal. The proposal tackles a very important issue
> for Flink users and the design looks promising overall!
>
> I have some questions to better understand the proposed public interfaces
> and the algorithm.
>
> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
> Could you confirm that this can also be true for io-bound operators such as
> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> reach 1 sec?
>
> 2) It is said that "users can configure a maximum time to fully process
> the backlog". The configuration section does not seem to provide this
> config. Could you specify this? And any chance this proposal can provide
> the formula for calculating the new processing rate?
>
> 3) How are users expected to specify the per-operator configs (e.g. target
> utilization)? For example, should users specify it programmatically in a
> DataStream/Table/SQL API?
>
> 4) How often will the Flink Kubernetes operator query metrics from
> JobManager? Is this configurable?
>
> 5) Could you specify the config name and default value for the proposed
> configs?
>
> 6) Could you add the name/mbean/type for the proposed metrics?
>
>
> Cheers,
> Dong
>
>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Dong Lin
Hi Max,

Thank you for the proposal. The proposal tackles a very important issue for
Flink users and the design looks promising overall!

I have some questions to better understand the proposed public interfaces
and the algorithm.

1) The proposal seems to assume that the operator's busyTimeMsPerSecond
could reach 1 sec. I believe this is mostly true for cpu-bound operators.
Could you confirm that this can also be true for io-bound operators such as
sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
reach 1 sec?

2) It is said that "users can configure a maximum time to fully process the
backlog". The configuration section does not seem to provide this config.
Could you specify this? And any chance this proposal can provide the
formula for calculating the new processing rate?

3) How are users expected to specify the per-operator configs (e.g. target
utilization)? For example, should users specify it programmatically in a
DataStream/Table/SQL API?

4) How often will the Flink Kubernetes operator query metrics from
JobManager? Is this configurable?

5) Could you specify the config name and default value for the proposed
configs?

6) Could you add the name/mbean/type for the proposed metrics?


Cheers,
Dong


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Gyula Fóra
@Pedro:
The current design focuses on record processing time metrics. In most cases
when we need to scale (such as too much state per operator), record
processing time actually slows, so it would detect that. Of course in the
future we can add new logic if we see something missing.

@ConradJam:
We agree that the on/off config for the actual scaling is very important.
The autoscaler module would then publish all metrics and parallelism advice
without actually taking action. We should definitely have this.
Thanks for the pointer to the talk, we have considered that approach but
it's mostly suitable for very simple pipelines. We hope that our approach
would generalize better to complex applications and would also cover the
simple case in a similar way.

Gyula

On Sun, Nov 6, 2022 at 7:25 AM Zheng Yu Chen  wrote:

> Hi Max
> Thank you for dirver this flip,I have some advice for this flip
>
> Do we not only exist in the (on/off) switch, but also have one more option
> for (advcie).
> After the user opens (advcie), it does not actually perform AutoScaling. It
> only outputs the notification form of tuning suggestions for the user's
> reference. It is up to the user to decide whether to trigger the adjustment
> of the parallelism.I believe that this function is very useful in the
> debugging phase or the observation phase. When the user observes a certain
> period of time, he thinks it is feasible and then turns on the switch.
>
> at the same time, I found that FFA 2020 Netflix has a related topic
> discussing the automatic tuning function
> Attach the video address: Autoscaling Flink at Netflix - Timothy Farkas
>  This may be helpful for us
> to
> complete this function
>
> Here is a description of using some prediction functions to predict the
> operator traffic of this job. Can we provide some interfaces for users to
> customize and implement some tuning algorithms?
>
>
> Maximilian Michels  于2022年11月5日周六 02:37写道:
>
> > Hi,
> >
> > I would like to kick off the discussion on implementing autoscaling for
> > Flink as part of the Flink Kubernetes operator. I've outlined an approach
> > here which I find promising:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> >
> > I've been discussing this approach with some of the operator
> contributors:
> > Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> > implementation based on the current FLIP design. If that goes well, we
> > would like to contribute this to Flink based on the results of the
> > discussion here.
> >
> > I'm curious to hear your thoughts.
> >
> > -Max
> >
>
>
> --
> Best
>
> ConradJam
>


[jira] [Created] (FLINK-29911) Improve performance of AgglomerativeClustering

2022-11-06 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-29911:
-

 Summary: Improve performance of AgglomerativeClustering
 Key: FLINK-29911
 URL: https://issues.apache.org/jira/browse/FLINK-29911
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29910) JdbcNumericBetweenParametersProvider(Variables within the method are extracted)

2022-11-06 Thread zhilinli (Jira)
zhilinli created FLINK-29910:


 Summary: JdbcNumericBetweenParametersProvider(Variables within the 
method are extracted)
 Key: FLINK-29910
 URL: https://issues.apache.org/jira/browse/FLINK-29910
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: zhilinli
 Attachments: image-2022-11-06-15-17-52-061.png, 
image-2022-11-06-15-18-28-320.png, image-2022-11-06-15-18-42-020.png

!image-2022-11-06-15-18-28-320.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Zheng Yu Chen
Hi Max
Thank you for dirver this flip,I have some advice for this flip

Do we not only exist in the (on/off) switch, but also have one more option
for (advcie).
After the user opens (advcie), it does not actually perform AutoScaling. It
only outputs the notification form of tuning suggestions for the user's
reference. It is up to the user to decide whether to trigger the adjustment
of the parallelism.I believe that this function is very useful in the
debugging phase or the observation phase. When the user observes a certain
period of time, he thinks it is feasible and then turns on the switch.

at the same time, I found that FFA 2020 Netflix has a related topic
discussing the automatic tuning function
Attach the video address: Autoscaling Flink at Netflix - Timothy Farkas
 This may be helpful for us to
complete this function

Here is a description of using some prediction functions to predict the
operator traffic of this job. Can we provide some interfaces for users to
customize and implement some tuning algorithms?


Maximilian Michels  于2022年11月5日周六 02:37写道:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


-- 
Best

ConradJam