Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-14 Thread Dong Lin
Thanks Feng for the FLIP.

+1(binding)

Cheers,
Dong

On Wed, Jun 14, 2023 at 10:35 AM Feng Jin  wrote:

> Hi everyone
>
> Thanks for all the feedback about the FLIP-295: Support lazy initialization
> of catalogs and persistence of catalog configurations[1].
> [2] is the discussion thread.
>
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> objection or an insufficient number of votes.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
>
>
> Best,
> Feng
>


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Xintong Song
I agree that Public APIs should require a longer migration period. I think
that is why the FLIP requires at least 2 minor releases (compared to 1
minor release for PublicEvolving and 1 patch release for Experimental).

I think the key of stability guarantees is about not breaking the
commitments we made to users. From my understanding, when we mark an API as
Public in for example 1.18, we make the following two commitments.
a) The API will not be removed in all the 1.x serials.
b) The API will be kept at least 2 minor releases after being marked
Deprecated.

When there's a major release bump before 2 minor releases, a) is not
affected and, and we should only need to guarantee b) by keeping in the new
major release. This is the rationale behind my proposal.

I think my proposal can provide the same migration experience for users as
if the major release bump has not happened. It should also not affect users
planning their usage of FlInk if we tell them at the beginning of the new
major release that the deprecated API will soon be removed.

Best,

Xintong



On Wed, Jun 14, 2023 at 10:01 PM Becket Qin  wrote:

> Thanks for the explanation, Matthias.
>
> In the example you raised, would it be better to just keep both YARN and
> K8S support in the new major version, but with YARN support deprecated if
> we want to? We can say for YARN we will only provide bug fixes but no
> feature development anymore. Given these two features are probably in two
> independent modules. Keeping both modules in the same new major version
> likely has zero additional cost compared with maintaining them in two
> different major versions respectively. This way we don't have the
> non-linear version issue, have fewer releases, and save a bunch of
> maintenance effort for multiple development branches.
>
> Regarding the stability demotion, I see your point. However, I still feel
> it a little weird that we demote a Public API to PublicEvolving just for
> the purpose of code removal in minor versions. This also results in some
> counter intuitive issues. For example, assuming users only use Public APIs,
> they may be able to upgrade from 1.19.0 to 2.0 fine, but upgrading from
> 1.19 to 2.1 does not work because the Public API is removed, even though
> from the users' perspective, both of them are major version upgrades. So,
> in this case, I would rather bump up the major version again to remove the
> deprecated Public API. That seems simpler and does not complicate the well
> established versioning semantic conventions.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jun 14, 2023 at 9:27 PM Matthias Pohl
>  wrote:
>
> > One (made-up) example from the top of my head would have been that the
> > community decides to focus fully on Kubernetes without considering Yarn
> > anymore because of some must-have feature on the Kubernetes side. At the
> > same time there are still some users for whom it would be tricky to
> migrate
> > from Yarn to Kubernetes. Therefore, there would be some desire to still
> > maintain the older major version of Flink that supports Yarn.
> >
> > But now that I'm thinking about the example, I realize: A discussion on
> how
> > we would handle the two major versions in that case, could be started
> when
> > we actually run into this issue. It shouldn't be too hard to migrate to a
> > non-linear versioning from what you are proposing if such a scenario
> comes
> > up.
> >
> > And on the topic of downgrading from @Public to @PublicEvolving:
> >
> > Demoting a Public API to PublicEvolving API sounds hacky. From what I
> > > understand the stability guarantee is not revocable because users will
> > rely
> > > on the stability guarantee to plan their usage of Flink. Demoting an
> API
> > > essentially defeats the very purpose of stability guarantee to begin
> > with.
> > >
> >
> > I understand the stability guarantee in a way that it only applies
> within a
> > major version. Downgrading the stability constraint for an API with a new
> > major version still seems to comply with the definition of a @Public
> > annotation as it's similar to changing the API in other ways. But I'm not
> > insisting on that approach. I just found it a reasonable workaround.
> >
> > Thanks,
> > Matthias
> >
> > On Wed, Jun 14, 2023 at 11:38 AM Becket Qin 
> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for the feedback.
> > >
> > > Do you have an example of behavioral change in mind? Not sure I fully
> > > understand the concern for behavioral change here. From what I
> > understand,
> > > any user sensible change in an existing API, regardless of its kind
> (API
> > > signature or behavioral change), can always be done in the following
> way:
> > >
> > > 1. Introduce a new API (new package, new class/interface, new method,
> new
> > > config, new metric, etc) while marking the old one as deprecated.
> > > 2. Let the new API and deprecated API coexist for the migration period
> to
> > > allow planned migration from the users.
> > > 3. 

Re: Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-14 Thread Jane Chan
+1 (non-binding)

Best,
Jane

On Wed, Jun 14, 2023 at 10:41 AM Feng Jin  wrote:

> +1 (no-binding)
>
>
> Best,
> Feng
>
>
> On Wed, Jun 14, 2023 at 7:02 AM Jing Ge 
> wrote:
>
> > +1(binding)
> >
> > Best Regards,
> > Jing
> >
> > On Tue, Jun 13, 2023 at 9:03 AM Mang Zhang  wrote:
> >
> > > +1 (no-binding)
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best regards,
> > > Mang Zhang
> > >
> > >
> > >
> > >
> > >
> > > 在 2023-06-13 13:19:31,"Lincoln Lee"  写道:
> > > >+1 (binding)
> > > >
> > > >Best,
> > > >Lincoln Lee
> > > >
> > > >
> > > >Jingsong Li  于2023年6月13日周二 10:07写道:
> > > >
> > > >> +1
> > > >>
> > > >> On Mon, Jun 12, 2023 at 10:32 PM Rui Fan <1996fan...@gmail.com>
> > wrote:
> > > >> >
> > > >> > +1 (binding)
> > > >> >
> > > >> > Best,
> > > >> > Rui Fan
> > > >> >
> > > >> > On Mon, Jun 12, 2023 at 22:20 Benchao Li 
> > > wrote:
> > > >> >
> > > >> > > +1 (binding)
> > > >> > >
> > > >> > > yuxia  于2023年6月12日周一 17:58写道:
> > > >> > >
> > > >> > > > Hi everyone,
> > > >> > > > Thanks for all the feedback about FLIP-311: Support Call
> Stored
> > > >> > > > Procedure[1]. Based on the discussion [2], we have come to a
> > > >> consensus,
> > > >> > > so
> > > >> > > > I would like to start a vote.
> > > >> > > > The vote will be open for at least 72 hours (until June 15th,
> > > 10:00AM
> > > >> > > GMT)
> > > >> > > > unless there is an objection or an insufficient number of
> votes.
> > > >> > > >
> > > >> > > >
> > > >> > > > [1]
> > > >> > > >
> > > >> > >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > > >> > > > [2]
> > > https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
> > > >> > > >
> > > >> > > > Best regards,
> > > >> > > > Yuxia
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > >
> > > >> > > Best,
> > > >> > > Benchao Li
> > > >> > >
> > > >>
> > >
> >
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi Yuxia,

Thanks for your feedback. The answers of your questions are as follows:

1. Yes, the row count comes from statistic of underlying table(Or estimated
based on the statistic of underlying table, if the build side or probe side
is not TableScan).  If the statistic unavailable, we will not inject a
runtime filter(As you said, we can hardly evaluate the benefits). Besides,
AFAIK, the estimated data size of build side is also based on the row count
statistics, that is, if the statistics is unavailable, the requirement
"table.optimizer.runtime-filter.max-build-data-size" cannot be evaluated
either. I'll add this point into FLIP.

2.
Estimated data size does not meet requirement (in planner optimization
phase) -> No filter
Estimated data size meets the requirement (in planner optimization phase),
but the real data size does not meet the requirement(in execution phase) ->
Fake filter

3. Yes, the runtime filter is only for batch jobs/blocking shuffle.

Best,
Lijie

yuxia  于2023年6月14日周三 20:37写道:

> Thanks Lijie for starting this discussion. Excited to see runtime filter
> is to be implemented in Flink.
> I have few questions about it:
>
> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> instead`. So, does row count comes from the statistic from underlying
> table? What if the the statistic is also unavailable considering users
> maynot always remember to generate statistic in production.
> I'm wondering whether it make senese that just disable runtime filter if
> statistic is unavailable since in that case, we can hardly evaluate the
> benefits of runtime-filter.
>
>
> 2: The FLIP said: "We will inject the runtime filters only if the
> following requirements are met:xxx", but it also said, "Once this limit is
> exceeded, it will output a fake filter(which always returns true)" in
> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
> wondering what's the real behavior, no filter will be injected or fake
> filter?
>
>
> 3: Does it also mean runtime-filter can only take effect in blocking
> shuffle?
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>
> Thanks Lijie start this discussion. Runtime Filter is a common optimization
> to improve the join performance that has been adopted by many computing
> engines such as Spark, Doris, etc... Flink is a streaming batch computing
> engine, and we are continuously optimizing the performance of batches.
> Runtime filter is a general performance optimization technique that can
> improve the performance of Flink batch jobs, so we are introducing it on
> batch as well.
>
> Looking forward to all feedback.
>
> Best,
> Ron
>
> Lijie Wang  于2023年6月14日周三 17:17写道:
>
> > Hi devs
> >
> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> > Introduce Runtime Filter for Flink Batch Jobs[1]
> >
> > Runtime Filter is a common optimization to improve join performance. It
> is
> > designed to dynamically generate filter conditions for certain Join
> queries
> > at runtime to reduce the amount of scanned or shuffled data, avoid
> > unnecessary I/O and network transmission, and speed up the query. Its
> > working principle is building a filter(e.g. bloom filter) based on the
> data
> > on the small table side(build side) first, then pass this filter to the
> > large table side(probe side) to filter the irrelevant data on it, this
> can
> > reduce the data reaching the join and improve performance.
> >
> > You can find more details in the FLIP-324[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> >
> > Best,
> > Ron & Gen & Lijie
> >
>


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-14 Thread Lincoln Lee
Hi Aitozi,

Thanks for your reply!  Gives sql users more flexibility to get
asynchronous processing capabilities via lateral join table function +1 for
this

For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
an example?

For the name of this query hint, 'LATERAL' (include its internal options)
don't show any relevance to async, but I haven't thought of a suitable name
at the moment,
maybe we need to highlight the async keyword directly, we can also see if
others have better candidates

For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
seems a typo in the flip. And use upper case for all keywords in sql
examples.
Also, the terms 'correlate join' and 'lateral join' are not the same as in
the current joins page[1], so maybe it would be better if we unified them
into  'join table function'

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#table-function

Best,
Lincoln Lee


Aitozi  于2023年6月14日周三 16:11写道:

> Hi Lincoln
>
> Very thanks for your valuable question. I will try to answer your
> questions inline.
>
> >Does the async udtf bring any additional benefits besides a
> lighter implementation?
>
> IMO, async udtf is more than a lighter implementation. It can act as a
> general way for sql users to use the async operator. And they don't have to
> bind the async function with a table (a LookupTable), and they are not
> forced to join on an equality join condition, and they can use it to do
> more than enrich data.
>
> The async lookup join is more like a subset/specific usage of async udtf.
> The specific version has more opportunity to be optimized (like push down)
> is acceptable. Async table function should be categorized to used-defined
> function.
>
> >Should users
>
> migrate to the lookup source when they encounter similar requirements or
>
> problems, or should we develop an additional set of similar mechanisms?
>
> As I clarified above, the lookup join is a specific usage of async udtf. So
> it deserves more refined optimization like caching / retryable. But it may
> not all
>
> suitable for the async udtf. As function, it can be deterministic/or
> non-deterministic. So caching is not suitable, and we also do not have a
> common cache for the udf now. So I think optimization like caching/retry
> should be handed over to the function implementor.
>
> > the newly added query hint need a different name that
> can be easier related to the lateral operation as the current join hints[5]
> do.
>
>
> What about using LATERAL?
>
> as below
>
> SELECT /*+ LATERAL('output-mode' = 'ordered', 'capacity' = '200', timeout =
> '180s') */ a, c1, c2
>
> FROM T1
>
> LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true
>
> >For the async func example, since the target scenario is an external io
> operation, it's better to add the `close` method to actively release
> resources as a good example for users
>
>
> Make sense to me, will update the FLIP
>
> Best,
>
> Aitozi.
>
> Lincoln Lee  于2023年6月14日周三 14:24写道:
>
> > Hi Aitozi,
> >
> > Sorry for the lately reply here!  Supports async
> udtf(`AsyncTableFunction`)
> > directly in sql seems like an attractive feature, but there're two issues
> > that need to be addressed before we can be sure to add it:
> > 1. As mentioned in the flip[1], the current lookup function can already
> > implement the requirements, but it requires implementing an extra
> > `LookupTableSource` and explicitly declaring the table schema (which can
> > help implementers the various push-down optimizations supported by the
> > planner). Does the async udtf bring any additional benefits besides a
> > lighter implementation?
> > 2. FLIP-221[2] abstracts a reusable cache and metric infrastructure for
> > lookup sources, which are important to improve performance and
> > observability for high overhead external io scenarios, how do we
> integrate
> > and reuse these capabilities after introducing async udtf? Should users
> > migrate to the lookup source when they encounter similar requirements or
> > problems, or should we develop an additional set of similar mechanisms?
> (a
> > similarly case:  FLIP-234[3] introduced the retryable capability for
> lookup
> > join)
> >
> > For the flip itself,
> > 1. Considering the 'options' is already used as the dynamic table
> > options[4] in flink, the newly added query hint need a different name
> that
> > can be easier related to the lateral operation as the current join
> hints[5]
> > do.
> > 2. For the async func example, since the target scenario is an external
> io
> > operation, it's better to add the `close` method to actively release
> > resources as a good example for users. Also in terms of the determinism
> of
> > a function, it is important to remind users that unless the behavior of
> the
> > function is deterministic, it needs to be explicitly declared as
> > non-deterministic.
> >
> > [1].
> >
> >
> 

Re: AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
Thanks, Hong!

I understand that if the user case is to simply write sth to an external
service, Async Sink is a good option that provides features like batching,
state management and rate limiting. I have some follow up questions:

1. Is there any problem if we use Async Function for such a user case? We
can simply drop the output and use Unordered mode.
2. For AsyncFunction and  Async Sink. does it make sense that both could
share the same underlying implementation and the features like batching and
rate limiting can benefit both?

Best
Lu


On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong  wrote:

> Hi Lu,
>
> Thanks for your question. See below for my understanding.
>
> I would recommend using the Async Sink if you are writing to the external
> service as the final output of your job graph, and if you don’t have the
> ordered requirement that updates to the external system must be done before
> updates to some other external system within the same job graph. (More
> explained later).
>
> The abstraction of the Async Sink is a sink, meaning it is a terminal
> operator in the job graph. The abstraction is intended to simplify the
> writing of a sink - meaning the base implementation will handle batching,
> state management and rate limiting. You only need to provide the client and
> request structure to be used to interact with the external service. This
> makes writing and maintaining the sink easier (if you simply want to write
> to a destination with at least once processing).
>
> The AsyncFunction, as I understand it is more used for data enrichment,
> and is not a terminal operator in the job graph. This means the return
> value from the external service will continue to be passed on down the
> Flink Job graph. This is useful for data enrichment using the external
> service, or if we want to ensure the system being called in the
> AsyncFunction is updated BEFORE any data is written to the sinks further
> down the job graph.
>
> For example:
>
> Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink
>
> We can be sure that the updates to DynamoDB for a particular record
> happens before the record is written to the Kinesis Sink.
>
>
> Hope the above clarifies your question!
>
> Regards,
> Hong
>
>
> On 14 Jun 2023, at 19:27, Lu Niu  wrote:
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
> Hi, Flink dev and users
>
> If I want to async write to an external service, which API shall I use,
> AsyncFunction or Async Sink?
>
> My understanding after checking the code are:
>
>1. Both APIs guarantee at least once write to external service. As
>both API internally stores in-flight requests in the checkpoint.
>2. Async Sink provides a batching request feature. This can be
>implemented with Map + AsyncFunction. Map function groups requests in
>batches and pass it to AsyncFunction.The batching implementation can refer
>to AbstractMapBundleOperator if don’t want to use state.
>3. Async Sink supports retry on failed requests. AsyncFunction also
>supports retry in latest flink version.
>4. Async Sink supports rate limiting, AsyncFunction doesn’t.
>5. AsyncFunction can be used to implement read-update-write. Async
>Sink cannot.
>
> Best
>
> Lu
>
>
>


Re: AsyncFunction vs Async Sink

2023-06-14 Thread Teoh, Hong
Hi Lu,

Thanks for your question. See below for my understanding.

I would recommend using the Async Sink if you are writing to the external 
service as the final output of your job graph, and if you don’t have the 
ordered requirement that updates to the external system must be done before 
updates to some other external system within the same job graph. (More 
explained later).

The abstraction of the Async Sink is a sink, meaning it is a terminal operator 
in the job graph. The abstraction is intended to simplify the writing of a sink 
- meaning the base implementation will handle batching, state management and 
rate limiting. You only need to provide the client and request structure to be 
used to interact with the external service. This makes writing and maintaining 
the sink easier (if you simply want to write to a destination with at least 
once processing).

The AsyncFunction, as I understand it is more used for data enrichment, and is 
not a terminal operator in the job graph. This means the return value from the 
external service will continue to be passed on down the Flink Job graph. This 
is useful for data enrichment using the external service, or if we want to 
ensure the system being called in the AsyncFunction is updated BEFORE any data 
is written to the sinks further down the job graph.

For example:

Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink

We can be sure that the updates to DynamoDB for a particular record happens 
before the record is written to the Kinesis Sink.


Hope the above clarifies your question!

Regards,
Hong


On 14 Jun 2023, at 19:27, Lu Niu  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi, Flink dev and users

If I want to async write to an external service, which API shall I use, 
AsyncFunction or Async Sink?

My understanding after checking the code are:

  1.  Both APIs guarantee at least once write to external service. As both API 
internally stores in-flight requests in the checkpoint.
  2.  Async Sink provides a batching request feature. This can be implemented 
with Map + AsyncFunction. Map function groups requests in batches and pass it 
to AsyncFunction.The batching implementation can refer to 
AbstractMapBundleOperator if don’t want to use state.
  3.  Async Sink supports retry on failed requests. AsyncFunction also supports 
retry in latest flink version.
  4.  Async Sink supports rate limiting, AsyncFunction doesn’t.
  5.  AsyncFunction can be used to implement read-update-write. Async Sink 
cannot.

Best

Lu



Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-14 Thread Tzu-Li (Gordon) Tai
Hi Mason,

Thanks for addressing my comments. I agree that option 3 seems more
reasonable.

> Reorganize the metadata in a Map in
`KafkaStream` where the String is the proposed
`KafkaClusterIdentifier.name` field.

Why not just Map?

Regarding naming, I like DynamicKafkaSource as that's what I immediately
thought of when reading the FLIP, but I'm not married to the name :)

In principle, it looks like the FLIP is in good shape and generally people
seem to like the idea of having this connector in Flink.
I'd be in favor of an official vote to allow this to move forward.

Thanks,
Gordon

On Mon, Jun 12, 2023 at 1:57 PM Mason Chen  wrote:

> >
> > My main worry for doing this as a later iteration is that this would
> > probably be a breaking change for the public interface. If that can be
> > avoided and planned ahead, I'm fine with moving forward with how it is
> > right now.
>
>
> Make sense. Considering the public interfaces, I think we still want to
> provide clients the ability to pin certain configurations in the
> builder--however, cluster specific configurations may not be known upfront
> or generalize to all clusters so there would need to be changes in the
> `KafkaMetadataService` interface. This could be achieved by exposing via:
>
> 1. A separate API (e.g. `Map
> getKafkaClusterProperties()`) in KafkaMetadataService
> 2. In `KafkaClusterIdentifier` as this already contains some configuration
> (e.g. Bootstrap server) in which case we should rename the class to
> something like `KafkaCluster` as it is no longer just an identifier
> 3. Reorganize the metadata in a Map in
> `KafkaStream` where the String is the proposed
> `KafkaClusterIdentifier.name` field.
>
> I am preferring option 3 since this simplifies equals() checks on
> KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?).
>
> Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> > reader is responsible for discovering and assigning splits from 1+
> cluster"
>
> Thanks for the catch!
>
> the defining characteristic is the dynamic discovery vs. the fact that
> > multiple clusters [...]
>
>
>
> I think the "Table" in the name of those SQL connectors should avoid
> > confusion. Perhaps we can also solicit other ideas? I would throw
> > "DiscoveringKafkaSource" into the mix.
>
>  Agreed with Gordon's and your suggestions. Right, the only public facing
> name for SQL is `kafka` for the SQL connector identifier. Based on your
> suggestions:
>
> 1. MultiClusterKafkaSource
> 2. DynamicKafkaSource
> 3. DiscoveringKafkaSource
> 4. MutableKafkaSource
> 5. AdaptiveKafkaSource
>
> I added a few of my own. I do prefer 2. What do others think?
>
> Best,
> Mason
>
> On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise  wrote:
>
> > Hi Mason,
> >
> > Thanks for the iterations on the FLIP, I think this is in a very good
> shape
> > now.
> >
> > Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> > reader is responsible for discovering and assigning splits from 1+
> cluster"
> >
> > Regarding the user facing name of the connector: I agree with Gordon that
> > the defining characteristic is the dynamic discovery vs. the fact that
> > multiple clusters may be consumed in parallel. (Although, as described in
> > the FLIP, lossless consumer migration only works with a strategy that
> > involves intermittent parallel consumption of old and new clusters to
> drain
> > and switch.)
> >
> > I think the "Table" in the name of those SQL connectors should avoid
> > confusion. Perhaps we can also solicit other ideas? I would throw
> > "DiscoveringKafkaSource" into the mix.
> >
> > Cheers,
> > Thomas
> >
> >
> >
> >
> > On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai 
> > wrote:
> >
> > > > Regarding (2), definitely. This is something we planned to add later
> on
> > > but
> > > so far keeping things common has been working well.
> > >
> > > My main worry for doing this as a later iteration is that this would
> > > probably be a breaking change for the public interface. If that can be
> > > avoided and planned ahead, I'm fine with moving forward with how it is
> > > right now.
> > >
> > > > DynamicKafkaSource may be confusing because it is really similar to
> the
> > > KafkaDynamicSource/Sink (table connectors).
> > >
> > > The table / sql Kafka connectors (KafkaDynamicTableFactory,
> > > KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal
> classes
> > > not really meant to be exposed to the user though.
> > > It can cause some confusion internally for the code maintainers, but on
> > the
> > > actual public surface I don't see this being an issue.
> > >
> > > Thanks,
> > > Gordon
> > >
> > > On Wed, Jun 7, 2023 at 8:55 PM Mason Chen 
> > wrote:
> > >
> > > > Hi Gordon,
> > > >
> > > > Thanks for taking a look!
> > > >
> > > > Regarding (1), there is a need from the readers to send this event at
> > > > startup because the reader state may reflect outdated metadata. Thus,
> > the
> > > > reader should 

AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
Hi, Flink dev and users

If I want to async write to an external service, which API shall I use,
AsyncFunction or Async Sink?

My understanding after checking the code are:

   1. Both APIs guarantee at least once write to external service. As both
   API internally stores in-flight requests in the checkpoint.
   2. Async Sink provides a batching request feature. This can be
   implemented with Map + AsyncFunction. Map function groups requests in
   batches and pass it to AsyncFunction.The batching implementation can refer
   to AbstractMapBundleOperator if don’t want to use state.
   3. Async Sink supports retry on failed requests. AsyncFunction also
   supports retry in latest flink version.
   4. Async Sink supports rate limiting, AsyncFunction doesn’t.
   5. AsyncFunction can be used to implement read-update-write. Async Sink
   cannot.

Best

Lu


[jira] [Created] (FLINK-32341) Fix resource requirements rest API response empty result

2023-06-14 Thread chenyuexin (Jira)
chenyuexin created FLINK-32341:
--

 Summary:  Fix resource requirements rest API response empty result
 Key: FLINK-32341
 URL: https://issues.apache.org/jira/browse/FLINK-32341
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.18.0
Reporter: chenyuexin
 Attachments: image-2023-06-14-22-57-57-830.png, 
image-2023-06-14-22-58-02-270.png

I build the latest master branch with flink-1.18-SNAPSHOT, setting adaptive 
schedule-mode configuration, start a streaming job by standalone deployment and 
try to change per-vertex desired parallelism through WEB UI, then I found that 
the get method request for /jobs/:jobid/resource-requirements API did not 
response anything and then causing the put method request for 
/jobs/:jobid/resource-requirements is fail to desired parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-14 Thread Jing Ge
+1 (binding)

Best Regards,
Jing

On Wed, Jun 14, 2023 at 3:28 PM Rui Fan <1996fan...@gmail.com> wrote:

> +1(binding)
>
> Best,
> Rui Fan
>
> On Wed, Jun 14, 2023 at 16:24 Hang Ruan  wrote:
>
> > +1 (non-binding)
> >
> > Thanks for Feng driving it.
> >
> > Best,
> > Hang
> >
> > Feng Jin  于2023年6月14日周三 10:36写道:
> >
> > > Hi everyone
> > >
> > > Thanks for all the feedback about the FLIP-295: Support lazy
> > initialization
> > > of catalogs and persistence of catalog configurations[1].
> > > [2] is the discussion thread.
> > >
> > >
> > > I'd like to start a vote for it. The vote will be open for at least 72
> > > hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> > > objection or an insufficient number of votes.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > > [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
> > >
> > >
> > > Best,
> > > Feng
> > >
> >
>


Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-14 Thread Jing Ge
+1 (binding)

Best Regards,
Jing

On Wed, Jun 14, 2023 at 4:07 PM Benchao Li  wrote:

> +1 (binding)
>
> Shammon FY  于2023年6月14日周三 19:52写道:
>
> > Hi all:
> >
> > Thanks for all the feedback for FLIP-294: Support Customized Catalog
> > Modification Listener [1]. I would like to start a vote for it according
> to
> > the discussion in thread [2].
> >
> > The vote will be open for at least 72 hours(excluding weekends, until
> June
> > 19, 19:00 PM GMT) unless there is an objection or an insufficient number
> of
> > votes.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> > [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
> >
> >
> > Best,
> > Shammon FY
> >
>
>
> --
>
> Best,
> Benchao Li
>


[jira] [Created] (FLINK-32340) NPE in K8s operator which brakes current and subsequent deployments

2023-06-14 Thread Sergii Nazarov (Jira)
Sergii Nazarov created FLINK-32340:
--

 Summary: NPE in K8s operator which brakes current and subsequent 
deployments
 Key: FLINK-32340
 URL: https://issues.apache.org/jira/browse/FLINK-32340
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Sergii Nazarov


Prerequisites:
 * Deployment via Apache Flink Kubernetes operator with version 1.5.0
 * Deployment using FlinkDeployment spec
 * Upgrade mode - savepoint
 * Configuration property 
"kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true

 

Steps to reproduce:
 # Deploy an app
 # You can wait till the app creates a checkpoint (it doesn't change anything 
even if "kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true)
 # Deploy a new version of the app with an error that causes throwing an 
exception from the main method of the app

Exception which causes operator NPE
{code:none}
36mo.a.f.k.o.o.JobStatusObserver [m [1;31m[ERROR][flink-apps/myApp] Job 
0d78a62fe581b047510e28f26393a7ce failed with error: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Consumer does not exist
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
Source)
{code}
NPE in K8s operator
{code:none}
[36mo.a.f.k.o.l.AuditUtils[m [32m[INFO ][flink-apps/myApp] >>> Event  | 
Info| JOBSTATUSCHANGED | Job status changed from RECONCILING to FAILED
[36mo.a.f.k.o.o.SavepointObserver [m [1;31m[ERROR][flink-apps/myApp] Could not 
observe latest savepoint information.
java.lang.NullPointerException
at 
org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper.getInProgressCheckpoint(CheckpointHistoryWrapper.java:60)
at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getCheckpointInfo(AbstractFlinkService.java:564)
at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:520)
at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeLatestSavepoint(SavepointObserver.java:209)
at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeSavepointStatus(SavepointObserver.java:73)
at 
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:61)
at 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:73)
at 
org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:53)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:134)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
at 

Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-14 Thread Benchao Li
+1 (binding)

Shammon FY  于2023年6月14日周三 19:52写道:

> Hi all:
>
> Thanks for all the feedback for FLIP-294: Support Customized Catalog
> Modification Listener [1]. I would like to start a vote for it according to
> the discussion in thread [2].
>
> The vote will be open for at least 72 hours(excluding weekends, until June
> 19, 19:00 PM GMT) unless there is an objection or an insufficient number of
> votes.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
>
>
> Best,
> Shammon FY
>


-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Becket Qin
Thanks for the explanation, Matthias.

In the example you raised, would it be better to just keep both YARN and
K8S support in the new major version, but with YARN support deprecated if
we want to? We can say for YARN we will only provide bug fixes but no
feature development anymore. Given these two features are probably in two
independent modules. Keeping both modules in the same new major version
likely has zero additional cost compared with maintaining them in two
different major versions respectively. This way we don't have the
non-linear version issue, have fewer releases, and save a bunch of
maintenance effort for multiple development branches.

Regarding the stability demotion, I see your point. However, I still feel
it a little weird that we demote a Public API to PublicEvolving just for
the purpose of code removal in minor versions. This also results in some
counter intuitive issues. For example, assuming users only use Public APIs,
they may be able to upgrade from 1.19.0 to 2.0 fine, but upgrading from
1.19 to 2.1 does not work because the Public API is removed, even though
from the users' perspective, both of them are major version upgrades. So,
in this case, I would rather bump up the major version again to remove the
deprecated Public API. That seems simpler and does not complicate the well
established versioning semantic conventions.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jun 14, 2023 at 9:27 PM Matthias Pohl
 wrote:

> One (made-up) example from the top of my head would have been that the
> community decides to focus fully on Kubernetes without considering Yarn
> anymore because of some must-have feature on the Kubernetes side. At the
> same time there are still some users for whom it would be tricky to migrate
> from Yarn to Kubernetes. Therefore, there would be some desire to still
> maintain the older major version of Flink that supports Yarn.
>
> But now that I'm thinking about the example, I realize: A discussion on how
> we would handle the two major versions in that case, could be started when
> we actually run into this issue. It shouldn't be too hard to migrate to a
> non-linear versioning from what you are proposing if such a scenario comes
> up.
>
> And on the topic of downgrading from @Public to @PublicEvolving:
>
> Demoting a Public API to PublicEvolving API sounds hacky. From what I
> > understand the stability guarantee is not revocable because users will
> rely
> > on the stability guarantee to plan their usage of Flink. Demoting an API
> > essentially defeats the very purpose of stability guarantee to begin
> with.
> >
>
> I understand the stability guarantee in a way that it only applies within a
> major version. Downgrading the stability constraint for an API with a new
> major version still seems to comply with the definition of a @Public
> annotation as it's similar to changing the API in other ways. But I'm not
> insisting on that approach. I just found it a reasonable workaround.
>
> Thanks,
> Matthias
>
> On Wed, Jun 14, 2023 at 11:38 AM Becket Qin  wrote:
>
> > Hi Matthias,
> >
> > Thanks for the feedback.
> >
> > Do you have an example of behavioral change in mind? Not sure I fully
> > understand the concern for behavioral change here. From what I
> understand,
> > any user sensible change in an existing API, regardless of its kind (API
> > signature or behavioral change), can always be done in the following way:
> >
> > 1. Introduce a new API (new package, new class/interface, new method, new
> > config, new metric, etc) while marking the old one as deprecated.
> > 2. Let the new API and deprecated API coexist for the migration period to
> > allow planned migration from the users.
> > 3. Remove the deprecated API.
> >
> > For example, Kafka deprecated its old consumer and replaced it with a new
> > Consumer - basically everything changes. The source code of the old
> > consumer was kept there for a few years across multiple major versions.
> > This does mean we have to keep both of the APIs for a few releases, and
> > even fix bugs in the old consumer, so additional maintenance effort is
> > required. But this allows the users to keep up with Kafka releases which
> is
> > extremely rewarding.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Jun 14, 2023 at 5:06 PM Matthias Pohl
> >  wrote:
> >
> > > Thanks for starting this discussion, Becket. A few good points were
> > raised.
> > > Here's what I want to add:
> > >
> > > Stefan raised the point of behavioral stability (in contrast to API
> > > stability). That might be a reason for users to not be able to go ahead
> > > with a major version bump. Working around behavioral changes might be
> > > trickier than just switching from deprecated to newer APIs. I see your
> > > motivation of having a more linear versioning even between major
> versions
> > > to avoid backports. Backports are painful enough for minor versions.
> > >
> > > But with major versions becoming a thing in the Flink cosmos, I could

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

2023-06-14 Thread Etienne Chauchot

Hi all,

@Yukia,I updated the FLIP to include the aggregation of the staked 
operations that we discussed below PTAL.


Best

Etienne


Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :

Hi Yuxia,

Thanks for your feedback. The number of potentially stacked operations 
depends on the configured length of the cooldown period.




The proposition in the FLIP is to add a minimum delay between 2 scaling
operations. But, indeed, an optimization could be to still stack the
operations (that arrive during a cooldown period) but maybe not take
only the last operation but rather aggregate them in order to end up
with a single aggregated operation when the cooldown period ends. For
example, let's say 3 taskManagers come up and 1 comes down during the
cooldown period, we could generate a single operation of scale up +2
when the period ends.

As a side note regarding your comment on "it'll take a long time to 
finish all", please keep in mind that the reactive mode (at least for 
now) is only available for streaming pipeline which are in essence 
infinite processing.


Another side note: when you mention "every taskManagers connecting", 
if you are referring to the start of the pipeline, please keep in mind 
that the adaptive scheduler has a "waiting for resources" timeout 
period before starting the pipeline in which all taskmanagers connect 
and the parallelism is decided.


Best

Etienne

Le 13/06/2023 à 03:58, yuxia a écrit :

Hi, Etienne. Thanks for driving it. I have one question about the
mechanism of the cooldown timeout.

From the Proposed Changes part, if a scalling event is received and
it falls during the cooldown period, it'll be stacked to be executed
after the period ends. Also, from the description of FLINK-21883[1],
cooldown timeout is to avoid rescaling the job very frequently,
because TaskManagers are not all connecting at the same time.

So, is it possible that every taskmanager connecting will produce a
scalling event and it'll be stacked with many scale up event which
causes it'll take a long time to finish all? Can we just take the
last one event?

[1]: https://issues.apache.org/jira/browse/FLINK-21883

Best regards, Yuxia

- 原始邮件 - 发件人: "Etienne Chauchot"  
收件人:
"dev" , "Robert Metzger"  
发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322 
Cooldown

period for adaptive scheduler

Hi,

I’d like to start a discussion about FLIP-322 [1] which introduces a 
cooldown period for the adaptive scheduler.


I'd like to get your feedback especially @Robert as you opened the 
related ticket and worked on the reactive mode a lot.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler





Best


Etienne

Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-14 Thread Rui Fan
+1(binding)

Best,
Rui Fan

On Wed, Jun 14, 2023 at 16:24 Hang Ruan  wrote:

> +1 (non-binding)
>
> Thanks for Feng driving it.
>
> Best,
> Hang
>
> Feng Jin  于2023年6月14日周三 10:36写道:
>
> > Hi everyone
> >
> > Thanks for all the feedback about the FLIP-295: Support lazy
> initialization
> > of catalogs and persistence of catalog configurations[1].
> > [2] is the discussion thread.
> >
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> > objection or an insufficient number of votes.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
> >
> >
> > Best,
> > Feng
> >
>


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Matthias Pohl
One (made-up) example from the top of my head would have been that the
community decides to focus fully on Kubernetes without considering Yarn
anymore because of some must-have feature on the Kubernetes side. At the
same time there are still some users for whom it would be tricky to migrate
from Yarn to Kubernetes. Therefore, there would be some desire to still
maintain the older major version of Flink that supports Yarn.

But now that I'm thinking about the example, I realize: A discussion on how
we would handle the two major versions in that case, could be started when
we actually run into this issue. It shouldn't be too hard to migrate to a
non-linear versioning from what you are proposing if such a scenario comes
up.

And on the topic of downgrading from @Public to @PublicEvolving:

Demoting a Public API to PublicEvolving API sounds hacky. From what I
> understand the stability guarantee is not revocable because users will rely
> on the stability guarantee to plan their usage of Flink. Demoting an API
> essentially defeats the very purpose of stability guarantee to begin with.
>

I understand the stability guarantee in a way that it only applies within a
major version. Downgrading the stability constraint for an API with a new
major version still seems to comply with the definition of a @Public
annotation as it's similar to changing the API in other ways. But I'm not
insisting on that approach. I just found it a reasonable workaround.

Thanks,
Matthias

On Wed, Jun 14, 2023 at 11:38 AM Becket Qin  wrote:

> Hi Matthias,
>
> Thanks for the feedback.
>
> Do you have an example of behavioral change in mind? Not sure I fully
> understand the concern for behavioral change here. From what I understand,
> any user sensible change in an existing API, regardless of its kind (API
> signature or behavioral change), can always be done in the following way:
>
> 1. Introduce a new API (new package, new class/interface, new method, new
> config, new metric, etc) while marking the old one as deprecated.
> 2. Let the new API and deprecated API coexist for the migration period to
> allow planned migration from the users.
> 3. Remove the deprecated API.
>
> For example, Kafka deprecated its old consumer and replaced it with a new
> Consumer - basically everything changes. The source code of the old
> consumer was kept there for a few years across multiple major versions.
> This does mean we have to keep both of the APIs for a few releases, and
> even fix bugs in the old consumer, so additional maintenance effort is
> required. But this allows the users to keep up with Kafka releases which is
> extremely rewarding.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jun 14, 2023 at 5:06 PM Matthias Pohl
>  wrote:
>
> > Thanks for starting this discussion, Becket. A few good points were
> raised.
> > Here's what I want to add:
> >
> > Stefan raised the point of behavioral stability (in contrast to API
> > stability). That might be a reason for users to not be able to go ahead
> > with a major version bump. Working around behavioral changes might be
> > trickier than just switching from deprecated to newer APIs. I see your
> > motivation of having a more linear versioning even between major versions
> > to avoid backports. Backports are painful enough for minor versions.
> >
> > But with major versions becoming a thing in the Flink cosmos, I could
> > imagine that the behavioral stability Stefan mentions actually could
> become
> > a bigger issue: Major versions down the road might include bigger
> > behavioral changes which would prevent users from going ahead with the
> > major version bump. I understand that this is out of the original scope
> of
> > this FLIP. But nevertheless, it does support Chesnay's concerns that a
> > linear versioning without maintaining older major versions might not be
> > feasible. It sounds like we should have a discussion about how we treat
> > older major versions here (or have a dedicated discussion on that topic
> > before going ahead with that FLIP).
> >
> > On another note: I like Xintong's proposal of downgrading an API
> > from @Public to @PublicEvolving in the new major version. That would
> allow
> > us to keep the original intention of the @Public annotation alive (i.e.
> > that those APIs are only removed in the next major version).
> >
> > Matthias
> >
> > On Wed, Jun 14, 2023 at 10:10 AM Xintong Song 
> > wrote:
> >
> > > Thanks for bringing up this discussion, Becket.
> > >
> > > My two cents:
> > >
> > > 1. Do we allow deprecation & removal of APIs without adding a new one
> as
> > a
> > > replacement? The examples in the table give me an impression that
> marking
> > > an API as `@Deprecated` should only happen at the same time of
> > introducing
> > > a new replacing API, which I think is true in most but not all the
> cases.
> > >
> > > If there is a major version bump before 2 minor releases in the current
> > > > major version are reached, the major version should keep the 

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread yuxia
Thanks Lijie for starting this discussion. Excited to see runtime filter is to 
be implemented in Flink.
I have few questions about it:

1: As the FLIP said, `if the ndv cannot be estimated, use row count instead`. 
So, does row count comes from the statistic from underlying table? What if the 
the statistic is also unavailable considering users maynot always remember to 
generate statistic in production.
I'm wondering whether it make senese that just disable runtime filter if 
statistic is unavailable since in that case, we can hardly evaluate the 
benefits of runtime-filter.
 

2: The FLIP said: "We will inject the runtime filters only if the following 
requirements are met:xxx", but it also said, "Once this limit is exceeded, it 
will output a fake filter(which always returns true)" in 
`RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm 
wondering what's the real behavior, no filter will be injected or fake filter?


3: Does it also mean runtime-filter can only take effect in blocking shuffle?



Best regards,
Yuxia

- 原始邮件 -
发件人: "ron9 liu" 
收件人: "dev" 
发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

Thanks Lijie start this discussion. Runtime Filter is a common optimization
to improve the join performance that has been adopted by many computing
engines such as Spark, Doris, etc... Flink is a streaming batch computing
engine, and we are continuously optimizing the performance of batches.
Runtime filter is a general performance optimization technique that can
improve the performance of Flink batch jobs, so we are introducing it on
batch as well.

Looking forward to all feedback.

Best,
Ron

Lijie Wang  于2023年6月14日周三 17:17写道:

> Hi devs
>
> Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> Introduce Runtime Filter for Flink Batch Jobs[1]
>
> Runtime Filter is a common optimization to improve join performance. It is
> designed to dynamically generate filter conditions for certain Join queries
> at runtime to reduce the amount of scanned or shuffled data, avoid
> unnecessary I/O and network transmission, and speed up the query. Its
> working principle is building a filter(e.g. bloom filter) based on the data
> on the small table side(build side) first, then pass this filter to the
> large table side(probe side) to filter the irrelevant data on it, this can
> reduce the data reaching the join and improve performance.
>
> You can find more details in the FLIP-324[1]. Looking forward to your
> feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>
> Best,
> Ron & Gen & Lijie
>


[VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-14 Thread Shammon FY
Hi all:

Thanks for all the feedback for FLIP-294: Support Customized Catalog
Modification Listener [1]. I would like to start a vote for it according to
the discussion in thread [2].

The vote will be open for at least 72 hours(excluding weekends, until June
19, 19:00 PM GMT) unless there is an objection or an insufficient number of
votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
[2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx


Best,
Shammon FY


[jira] [Created] (FLINK-32339) Align lock usage in DefaultLeaderElectionService

2023-06-14 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32339:
-

 Summary: Align lock usage in DefaultLeaderElectionService
 Key: FLINK-32339
 URL: https://issues.apache.org/jira/browse/FLINK-32339
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Matthias Pohl


{{DefaultLeaderElectionService}} uses sequential execution through a single 
thread executor for the leader event handling (introduced in FLINK-31838). We 
missed moving the leadership confirmation event into that queue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32338) Add FailsOnJava17 annotation

2023-06-14 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32338:


 Summary: Add FailsOnJava17 annotation
 Key: FLINK-32338
 URL: https://issues.apache.org/jira/browse/FLINK-32338
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Add an annotation for disabling specific tests on Java 17, similar to 
FailsOnJava11.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32337) SQL array_union could return wrong result

2023-06-14 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32337:
---

 Summary: SQL array_union could return wrong result
 Key: FLINK-32337
 URL: https://issues.apache.org/jira/browse/FLINK-32337
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


This is was mentioned at 
[https://github.com/apache/flink/pull/22717#issuecomment-1587333488]

 how to reproduce
{code:sql}
SELECT array_union(ARRAY[CAST(NULL AS INT)], ARRAY[1]); -- returns [NULL, 1], 
this is OK
SELECT array_union(ARRAY[1], ARRAY[CAST(NULL AS INT)]); -- returns [1, 0], this 
is NOT OK
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32336) PartitionITCase#ComparablePojo should be public

2023-06-14 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32336:


 Summary: PartitionITCase#ComparablePojo should be public
 Key: FLINK-32336
 URL: https://issues.apache.org/jira/browse/FLINK-32336
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


POJOs should be public, but this one is private forcing it go through Kryo, 
which is currently failing for some odd reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Becket Qin
Hi Matthias,

Thanks for the feedback.

Do you have an example of behavioral change in mind? Not sure I fully
understand the concern for behavioral change here. From what I understand,
any user sensible change in an existing API, regardless of its kind (API
signature or behavioral change), can always be done in the following way:

1. Introduce a new API (new package, new class/interface, new method, new
config, new metric, etc) while marking the old one as deprecated.
2. Let the new API and deprecated API coexist for the migration period to
allow planned migration from the users.
3. Remove the deprecated API.

For example, Kafka deprecated its old consumer and replaced it with a new
Consumer - basically everything changes. The source code of the old
consumer was kept there for a few years across multiple major versions.
This does mean we have to keep both of the APIs for a few releases, and
even fix bugs in the old consumer, so additional maintenance effort is
required. But this allows the users to keep up with Kafka releases which is
extremely rewarding.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jun 14, 2023 at 5:06 PM Matthias Pohl
 wrote:

> Thanks for starting this discussion, Becket. A few good points were raised.
> Here's what I want to add:
>
> Stefan raised the point of behavioral stability (in contrast to API
> stability). That might be a reason for users to not be able to go ahead
> with a major version bump. Working around behavioral changes might be
> trickier than just switching from deprecated to newer APIs. I see your
> motivation of having a more linear versioning even between major versions
> to avoid backports. Backports are painful enough for minor versions.
>
> But with major versions becoming a thing in the Flink cosmos, I could
> imagine that the behavioral stability Stefan mentions actually could become
> a bigger issue: Major versions down the road might include bigger
> behavioral changes which would prevent users from going ahead with the
> major version bump. I understand that this is out of the original scope of
> this FLIP. But nevertheless, it does support Chesnay's concerns that a
> linear versioning without maintaining older major versions might not be
> feasible. It sounds like we should have a discussion about how we treat
> older major versions here (or have a dedicated discussion on that topic
> before going ahead with that FLIP).
>
> On another note: I like Xintong's proposal of downgrading an API
> from @Public to @PublicEvolving in the new major version. That would allow
> us to keep the original intention of the @Public annotation alive (i.e.
> that those APIs are only removed in the next major version).
>
> Matthias
>
> On Wed, Jun 14, 2023 at 10:10 AM Xintong Song 
> wrote:
>
> > Thanks for bringing up this discussion, Becket.
> >
> > My two cents:
> >
> > 1. Do we allow deprecation & removal of APIs without adding a new one as
> a
> > replacement? The examples in the table give me an impression that marking
> > an API as `@Deprecated` should only happen at the same time of
> introducing
> > a new replacing API, which I think is true in most but not all the cases.
> >
> > If there is a major version bump before 2 minor releases in the current
> > > major version are reached, the major version should keep the source
> code
> > in
> > > its own minor version until two minor versions are reached. For
> example,
> > in
> > > the above case, if Flink 2.0 is released after 1.20, then the
> deprecated
> > > source code of foo will be kept in 2.0 and all the 2.x versions. It can
> > > only be removed in 3.0.
> > >
> >
> > 2. I think this might be a bit too strict. For an API that we already
> > decided to remove, having to keep it for all the 2.x versions simply
> > because there's less than 2 minor releases between making the decision
> and
> > the major release bump seems not necessarily. Alternatively, I'd like to
> > propose to remove the `@Public` annotation (or downgrade it to
> > `@PublicEvolving`) in 2.0, and remove it in 2.2.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, Jun 14, 2023 at 3:56 PM Becket Qin  wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the feedback. Please see the answers to your questions
> below:
> > >
> > > *"Always add a "Since X.X.X" comment to indicate when was a class /
> > > > interface / method marked as deprecated."*
> > > >  Could you describe it with a code example? Do you mean Java
> comments?
> > >
> > > It is just a comment such as "Since 1.18. Use X
> > > <
> > >
> >
> https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/admin/Admin.html#incrementalAlterConfigs(java.util.Map)
> > > >XX
> > > instead.". And we can then look it up in the deprecated list[1] in each
> > > release and see which method should / can be deprecated.
> > >
> > > *"At least 1 patch release for the affected minor release for
> > > > Experimental APIs"*
> > > > The rule is absolutely right. However, afaiac, deprecation is
> different
> > > as

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread liu ron
Thanks Lijie start this discussion. Runtime Filter is a common optimization
to improve the join performance that has been adopted by many computing
engines such as Spark, Doris, etc... Flink is a streaming batch computing
engine, and we are continuously optimizing the performance of batches.
Runtime filter is a general performance optimization technique that can
improve the performance of Flink batch jobs, so we are introducing it on
batch as well.

Looking forward to all feedback.

Best,
Ron

Lijie Wang  于2023年6月14日周三 17:17写道:

> Hi devs
>
> Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> Introduce Runtime Filter for Flink Batch Jobs[1]
>
> Runtime Filter is a common optimization to improve join performance. It is
> designed to dynamically generate filter conditions for certain Join queries
> at runtime to reduce the amount of scanned or shuffled data, avoid
> unnecessary I/O and network transmission, and speed up the query. Its
> working principle is building a filter(e.g. bloom filter) based on the data
> on the small table side(build side) first, then pass this filter to the
> large table side(probe side) to filter the irrelevant data on it, this can
> reduce the data reaching the join and improve performance.
>
> You can find more details in the FLIP-324[1]. Looking forward to your
> feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>
> Best,
> Ron & Gen & Lijie
>


[DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi devs

Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
Introduce Runtime Filter for Flink Batch Jobs[1]

Runtime Filter is a common optimization to improve join performance. It is
designed to dynamically generate filter conditions for certain Join queries
at runtime to reduce the amount of scanned or shuffled data, avoid
unnecessary I/O and network transmission, and speed up the query. Its
working principle is building a filter(e.g. bloom filter) based on the data
on the small table side(build side) first, then pass this filter to the
large table side(probe side) to filter the irrelevant data on it, this can
reduce the data reaching the join and improve performance.

You can find more details in the FLIP-324[1]. Looking forward to your
feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs

Best,
Ron & Gen & Lijie


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Becket Qin
Hi Xintong,

Thanks for the comment. Please see the replies below:

1. Do we allow deprecation & removal of APIs without adding a new one as a
> replacement? The examples in the table give me an impression that marking
> an API as `@Deprecated` should only happen at the same time of introducing
> a new replacing API, which I think is true in most but not all the cases.

Right, it is not necessary to have a replacement for the deprecated API, if
we decide to sunset the functionality. That does not change the migration
period, though.

2. I think this might be a bit too strict. For an API that we already
> decided to remove, having to keep it for all the 2.x versions simply
> because there's less than 2 minor releases between making the decision and
> the major release bump seems not necessarily. Alternatively, I'd like to
> propose to remove the `@Public` annotation (or downgrade it to
> `@PublicEvolving`) in 2.0, and remove it in 2.2.

I am not sure this is a good practice. The purpose of the migration period
is to give users enough time to adapt to a breaking API change without
holding them back from upgrading Flink. The reason we say Public API needs
at least two minor releases is because there are probably more users picked
them up over time and more jobs are running using these APIs. So, the
public APIs just require a larger migration window. Admittedly, this will
introduce a higher maintenance cost for us, this is why Public APIs should
be treated seriously. If the promotion of a PublicEvolving API to a Public
API requires two minor version releases, deprecation of a Public API should
only take longer.

Demoting a Public API to PublicEvolving API sounds hacky. From what I
understand the stability guarantee is not revocable because users will rely
on the stability guarantee to plan their usage of Flink. Demoting an API
essentially defeats the very purpose of stability guarantee to begin with.

If the concern of keeping a migration period of two minor releases across
major versions is about the maintenance overhead, we can choose to bump up
the major version to 3.0 at some point after the migration period has
passed, assuming by then most of the users have migrated away from the
deprecated Public API.

Thanks,

Jiangjie (Becket) Qin


On Wed, Jun 14, 2023 at 4:10 PM Xintong Song  wrote:

> Thanks for bringing up this discussion, Becket.
>
> My two cents:
>
> 1. Do we allow deprecation & removal of APIs without adding a new one as a
> replacement? The examples in the table give me an impression that marking
> an API as `@Deprecated` should only happen at the same time of introducing
> a new replacing API, which I think is true in most but not all the cases.
>
> If there is a major version bump before 2 minor releases in the current
> > major version are reached, the major version should keep the source code
> in
> > its own minor version until two minor versions are reached. For example,
> in
> > the above case, if Flink 2.0 is released after 1.20, then the deprecated
> > source code of foo will be kept in 2.0 and all the 2.x versions. It can
> > only be removed in 3.0.
> >
>
> 2. I think this might be a bit too strict. For an API that we already
> decided to remove, having to keep it for all the 2.x versions simply
> because there's less than 2 minor releases between making the decision and
> the major release bump seems not necessarily. Alternatively, I'd like to
> propose to remove the `@Public` annotation (or downgrade it to
> `@PublicEvolving`) in 2.0, and remove it in 2.2.
>
> Best,
>
> Xintong
>
>
>
> On Wed, Jun 14, 2023 at 3:56 PM Becket Qin  wrote:
>
> > Hi Jing,
> >
> > Thanks for the feedback. Please see the answers to your questions below:
> >
> > *"Always add a "Since X.X.X" comment to indicate when was a class /
> > > interface / method marked as deprecated."*
> > >  Could you describe it with a code example? Do you mean Java comments?
> >
> > It is just a comment such as "Since 1.18. Use X
> > <
> >
> https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/admin/Admin.html#incrementalAlterConfigs(java.util.Map)
> > >XX
> > instead.". And we can then look it up in the deprecated list[1] in each
> > release and see which method should / can be deprecated.
> >
> > *"At least 1 patch release for the affected minor release for
> > > Experimental APIs"*
> > > The rule is absolutely right. However, afaiac, deprecation is different
> > as
> > > modification. As a user/dev, I would appreciate, if I do not need to do
> > any
> > > migration work for any deprecated API between patch releases upgrade.
> > BTW,
> > > if experimental APIs are allowed to change between patches, could we
> just
> > > change them instead of marking them as deprecated and create new ones
> to
> > > replace them?
> >
> > Deprecating an API is just a more elegant way of replacing an API with a
> > new one. The only difference between the two is whether the old API is
> kept
> > and coexists with the new API for some 

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Matthias Pohl
Thanks for starting this discussion, Becket. A few good points were raised.
Here's what I want to add:

Stefan raised the point of behavioral stability (in contrast to API
stability). That might be a reason for users to not be able to go ahead
with a major version bump. Working around behavioral changes might be
trickier than just switching from deprecated to newer APIs. I see your
motivation of having a more linear versioning even between major versions
to avoid backports. Backports are painful enough for minor versions.

But with major versions becoming a thing in the Flink cosmos, I could
imagine that the behavioral stability Stefan mentions actually could become
a bigger issue: Major versions down the road might include bigger
behavioral changes which would prevent users from going ahead with the
major version bump. I understand that this is out of the original scope of
this FLIP. But nevertheless, it does support Chesnay's concerns that a
linear versioning without maintaining older major versions might not be
feasible. It sounds like we should have a discussion about how we treat
older major versions here (or have a dedicated discussion on that topic
before going ahead with that FLIP).

On another note: I like Xintong's proposal of downgrading an API
from @Public to @PublicEvolving in the new major version. That would allow
us to keep the original intention of the @Public annotation alive (i.e.
that those APIs are only removed in the next major version).

Matthias

On Wed, Jun 14, 2023 at 10:10 AM Xintong Song  wrote:

> Thanks for bringing up this discussion, Becket.
>
> My two cents:
>
> 1. Do we allow deprecation & removal of APIs without adding a new one as a
> replacement? The examples in the table give me an impression that marking
> an API as `@Deprecated` should only happen at the same time of introducing
> a new replacing API, which I think is true in most but not all the cases.
>
> If there is a major version bump before 2 minor releases in the current
> > major version are reached, the major version should keep the source code
> in
> > its own minor version until two minor versions are reached. For example,
> in
> > the above case, if Flink 2.0 is released after 1.20, then the deprecated
> > source code of foo will be kept in 2.0 and all the 2.x versions. It can
> > only be removed in 3.0.
> >
>
> 2. I think this might be a bit too strict. For an API that we already
> decided to remove, having to keep it for all the 2.x versions simply
> because there's less than 2 minor releases between making the decision and
> the major release bump seems not necessarily. Alternatively, I'd like to
> propose to remove the `@Public` annotation (or downgrade it to
> `@PublicEvolving`) in 2.0, and remove it in 2.2.
>
> Best,
>
> Xintong
>
>
>
> On Wed, Jun 14, 2023 at 3:56 PM Becket Qin  wrote:
>
> > Hi Jing,
> >
> > Thanks for the feedback. Please see the answers to your questions below:
> >
> > *"Always add a "Since X.X.X" comment to indicate when was a class /
> > > interface / method marked as deprecated."*
> > >  Could you describe it with a code example? Do you mean Java comments?
> >
> > It is just a comment such as "Since 1.18. Use X
> > <
> >
> https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/admin/Admin.html#incrementalAlterConfigs(java.util.Map)
> > >XX
> > instead.". And we can then look it up in the deprecated list[1] in each
> > release and see which method should / can be deprecated.
> >
> > *"At least 1 patch release for the affected minor release for
> > > Experimental APIs"*
> > > The rule is absolutely right. However, afaiac, deprecation is different
> > as
> > > modification. As a user/dev, I would appreciate, if I do not need to do
> > any
> > > migration work for any deprecated API between patch releases upgrade.
> > BTW,
> > > if experimental APIs are allowed to change between patches, could we
> just
> > > change them instead of marking them as deprecated and create new ones
> to
> > > replace them?
> >
> > Deprecating an API is just a more elegant way of replacing an API with a
> > new one. The only difference between the two is whether the old API is
> kept
> > and coexists with the new API for some releases or not. For end users,
> > deprecation-then-remove is much more friendly than direct replacement.
> >
> > 1. How to make sure the new APIs cover all functionality, i.e. backward
> > > compatible, before removing the deprecated APIs? Since the
> > > functionalities could only be built with the new APIs iteratively,
> there
> > > will be a while (might be longer than the migration period) that the
> new
> > > APIs are not backward compatible with the deprecated ones.
> >
> > This is orthogonal to the deprecation process, and may not even be
> required
> > in some cases if the function changes by itself. But in general, this
> > relies on the developer to decide. A simple test on readiness is to see
> if
> > all the UT / IT cases written with the old API can be 

[jira] [Created] (FLINK-32335) Fix the Flink ML unittest failure

2023-06-14 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-32335:
-

 Summary: Fix the Flink ML unittest failure
 Key: FLINK-32335
 URL: https://issues.apache.org/jira/browse/FLINK-32335
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Jiang Xin
 Fix For: ml-2.3.0


The [github 
CI](https://github.com/apache/flink-ml/actions/runs/5227269169/jobs/9438737620) 
of Flink ML failed because of the following exception.

 
{code:java}
E   Caused by: java.util.ConcurrentModificationException
223Eat 
java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
224Eat 
java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
225Eat 
org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:464)
226Eat 
org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:392)
227Eat 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
228Eat 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
229Eat 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
230Eat 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
231Eat 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
232Eat 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
233Eat 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
234Eat 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
235Eat 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
236Eat 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
237Eat 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
238Eat java.lang.Thread.run(Thread.java:750){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-14 Thread Hang Ruan
+1 (non-binding)

Thanks for Feng driving it.

Best,
Hang

Feng Jin  于2023年6月14日周三 10:36写道:

> Hi everyone
>
> Thanks for all the feedback about the FLIP-295: Support lazy initialization
> of catalogs and persistence of catalog configurations[1].
> [2] is the discussion thread.
>
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> objection or an insufficient number of votes.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
>
>
> Best,
> Feng
>


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-14 Thread Aitozi
Hi Lincoln

Very thanks for your valuable question. I will try to answer your
questions inline.

>Does the async udtf bring any additional benefits besides a
lighter implementation?

IMO, async udtf is more than a lighter implementation. It can act as a
general way for sql users to use the async operator. And they don't have to
bind the async function with a table (a LookupTable), and they are not
forced to join on an equality join condition, and they can use it to do
more than enrich data.

The async lookup join is more like a subset/specific usage of async udtf.
The specific version has more opportunity to be optimized (like push down)
is acceptable. Async table function should be categorized to used-defined
function.

>Should users

migrate to the lookup source when they encounter similar requirements or

problems, or should we develop an additional set of similar mechanisms?

As I clarified above, the lookup join is a specific usage of async udtf. So
it deserves more refined optimization like caching / retryable. But it may
not all

suitable for the async udtf. As function, it can be deterministic/or
non-deterministic. So caching is not suitable, and we also do not have a
common cache for the udf now. So I think optimization like caching/retry
should be handed over to the function implementor.

> the newly added query hint need a different name that
can be easier related to the lateral operation as the current join hints[5]
do.


What about using LATERAL?

as below

SELECT /*+ LATERAL('output-mode' = 'ordered', 'capacity' = '200', timeout =
'180s') */ a, c1, c2

FROM T1

LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true

>For the async func example, since the target scenario is an external io
operation, it's better to add the `close` method to actively release
resources as a good example for users


Make sense to me, will update the FLIP

Best,

Aitozi.

Lincoln Lee  于2023年6月14日周三 14:24写道:

> Hi Aitozi,
>
> Sorry for the lately reply here!  Supports async udtf(`AsyncTableFunction`)
> directly in sql seems like an attractive feature, but there're two issues
> that need to be addressed before we can be sure to add it:
> 1. As mentioned in the flip[1], the current lookup function can already
> implement the requirements, but it requires implementing an extra
> `LookupTableSource` and explicitly declaring the table schema (which can
> help implementers the various push-down optimizations supported by the
> planner). Does the async udtf bring any additional benefits besides a
> lighter implementation?
> 2. FLIP-221[2] abstracts a reusable cache and metric infrastructure for
> lookup sources, which are important to improve performance and
> observability for high overhead external io scenarios, how do we integrate
> and reuse these capabilities after introducing async udtf? Should users
> migrate to the lookup source when they encounter similar requirements or
> problems, or should we develop an additional set of similar mechanisms? (a
> similarly case:  FLIP-234[3] introduced the retryable capability for lookup
> join)
>
> For the flip itself,
> 1. Considering the 'options' is already used as the dynamic table
> options[4] in flink, the newly added query hint need a different name that
> can be easier related to the lateral operation as the current join hints[5]
> do.
> 2. For the async func example, since the target scenario is an external io
> operation, it's better to add the `close` method to actively release
> resources as a good example for users. Also in terms of the determinism of
> a function, it is important to remind users that unless the behavior of the
> function is deterministic, it needs to be explicitly declared as
> non-deterministic.
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction?src=contextnavpagetreemode
> [2].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric?src=contextnavpagetreemode
> [3].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems?src=contextnavpagetreemode
> [4].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL?src=contextnavpagetreemode
> [5].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job?src=contextnavpagetreemode
>
> Best,
> Lincoln Lee
>
>
> Aitozi  于2023年6月13日周二 11:30写道:
>
> > Get your meaning now, thanks :)
> >
> > Best,
> > Aitozi.
> >
> > Feng Jin  于2023年6月13日周二 11:16写道:
> >
> > > Hi Aitozi,
> > >
> > > Sorry for the confusing description.
> > >
> > > What I meant was that if we need to remind users about tire safety
> > issues,
> > > we should introduce the new UDTF interface instead of executing the
> > > original UDTF asynchronously. Therefore, I agree with introducing the
> > > 

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Xintong Song
Thanks for bringing up this discussion, Becket.

My two cents:

1. Do we allow deprecation & removal of APIs without adding a new one as a
replacement? The examples in the table give me an impression that marking
an API as `@Deprecated` should only happen at the same time of introducing
a new replacing API, which I think is true in most but not all the cases.

If there is a major version bump before 2 minor releases in the current
> major version are reached, the major version should keep the source code in
> its own minor version until two minor versions are reached. For example, in
> the above case, if Flink 2.0 is released after 1.20, then the deprecated
> source code of foo will be kept in 2.0 and all the 2.x versions. It can
> only be removed in 3.0.
>

2. I think this might be a bit too strict. For an API that we already
decided to remove, having to keep it for all the 2.x versions simply
because there's less than 2 minor releases between making the decision and
the major release bump seems not necessarily. Alternatively, I'd like to
propose to remove the `@Public` annotation (or downgrade it to
`@PublicEvolving`) in 2.0, and remove it in 2.2.

Best,

Xintong



On Wed, Jun 14, 2023 at 3:56 PM Becket Qin  wrote:

> Hi Jing,
>
> Thanks for the feedback. Please see the answers to your questions below:
>
> *"Always add a "Since X.X.X" comment to indicate when was a class /
> > interface / method marked as deprecated."*
> >  Could you describe it with a code example? Do you mean Java comments?
>
> It is just a comment such as "Since 1.18. Use X
> <
> https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/admin/Admin.html#incrementalAlterConfigs(java.util.Map)
> >XX
> instead.". And we can then look it up in the deprecated list[1] in each
> release and see which method should / can be deprecated.
>
> *"At least 1 patch release for the affected minor release for
> > Experimental APIs"*
> > The rule is absolutely right. However, afaiac, deprecation is different
> as
> > modification. As a user/dev, I would appreciate, if I do not need to do
> any
> > migration work for any deprecated API between patch releases upgrade.
> BTW,
> > if experimental APIs are allowed to change between patches, could we just
> > change them instead of marking them as deprecated and create new ones to
> > replace them?
>
> Deprecating an API is just a more elegant way of replacing an API with a
> new one. The only difference between the two is whether the old API is kept
> and coexists with the new API for some releases or not. For end users,
> deprecation-then-remove is much more friendly than direct replacement.
>
> 1. How to make sure the new APIs cover all functionality, i.e. backward
> > compatible, before removing the deprecated APIs? Since the
> > functionalities could only be built with the new APIs iteratively, there
> > will be a while (might be longer than the migration period) that the new
> > APIs are not backward compatible with the deprecated ones.
>
> This is orthogonal to the deprecation process, and may not even be required
> in some cases if the function changes by itself. But in general, this
> relies on the developer to decide. A simple test on readiness is to see if
> all the UT / IT cases written with the old API can be migrated to the new
> one and still work.  If the new API is not ready, we probably should not
> deprecate the old one to begin with.
>
> 2. Is it allowed to remove the deprecated APIs after the defined migration
> > period expires while the new APis are still not backward compatible?
>
> By "backwards compatible", do you mean functionally equivalent? If the new
> APIs are designed to be not backwards compatible, then removing the
> deprecated source code is definitely allowed. If we don't think the new API
> is ready to take over the place for the old one, then we should wait. The
> migration period is the minimum time we have to wait before removing the
> source code. A longer migration period is OK.
>
> 3. For the case of core API upgrade with downstream implementations, e.g.
> > connectors, What is the feasible deprecation strategy? Option1 bottom-up:
> > make sure the downstream implementation is backward compatible before
> > removing the deprecated core APIs. Option2 top-down: once the downstream
> > implementation of new APIs works fine, we can remove the deprecated core
> > APIs after the migration period expires. The implementation of the
> > deprecated APIs will not get any further update in upcoming releases(it
> has
> > been removed). There might be some missing features in the downstream
> > implementation of new APIs compared to the old implementation. Both
> options
> > have their own pros and cons.
>
> The downstream projects such as connectors in Flink should also follow the
> migration path we tell our users. i.e. If there is a cascading backwards
> incompatible change in the connectors due to a backwards incompatible
> change in the core, and as a result a longer 

[jira] [Created] (FLINK-32334) Operator failed to create taskmanager deployment because it already exist

2023-06-14 Thread Nicolas Fraison (Jira)
Nicolas Fraison created FLINK-32334:
---

 Summary: Operator failed to create taskmanager deployment because 
it already exist
 Key: FLINK-32334
 URL: https://issues.apache.org/jira/browse/FLINK-32334
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Nicolas Fraison


During a job upgrade the operator has failed to start the new job because it 
has failed to create the taskmanager deployment:

 
{code:java}
Jun 12 19:45:28.115 >>> Status | Error | UPGRADING | 
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"flink-metering\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"flink-metering\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: POST at: 
https://10.129.144.1/apis/apps/v1/namespaces/metering/deployments. Message: 
object is being deleted: deployments.apps \"flink-metering-taskmanager\" 
already exists. Received status: Status(apiVersion=v1, code=409, 
details=StatusDetails(causes=[], group=apps, kind=deployments, 
name=flink-metering-taskmanager, retryAfterSeconds=null, uid=null, 
additionalProperties={}), kind=Status, message=object is being deleted: 
deployments.apps \"flink-metering-taskmanager\" already exists, 
metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), 
reason=AlreadyExists, status=Failure, additionalProperties={})."}]} {code}
As indicated in the error log this is due to taskmanger deployment still 
existing while it is under deletion.

Looking at the [source 
code|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L150]
 we are well relying on FOREGROUND policy by default.

Still it seems that the REST API call to delete only wait until the resource 
has been modified and the {{deletionTimestamp}} has been added to the metadata: 
[ensure delete returns only when the delete operation is fully finished -  
Issue #3246 -  
fabric8io/kubernetes-client|https://github.com/fabric8io/kubernetes-client/issues/3246#issuecomment-874019899]

So we could face this issue if the k8s cluster is slow to "really" delete the 
deployment

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-14 Thread Becket Qin
Hi Jing,

Thanks for the feedback. Please see the answers to your questions below:

*"Always add a "Since X.X.X" comment to indicate when was a class /
> interface / method marked as deprecated."*
>  Could you describe it with a code example? Do you mean Java comments?

It is just a comment such as "Since 1.18. Use X
XX
instead.". And we can then look it up in the deprecated list[1] in each
release and see which method should / can be deprecated.

*"At least 1 patch release for the affected minor release for
> Experimental APIs"*
> The rule is absolutely right. However, afaiac, deprecation is different as
> modification. As a user/dev, I would appreciate, if I do not need to do any
> migration work for any deprecated API between patch releases upgrade. BTW,
> if experimental APIs are allowed to change between patches, could we just
> change them instead of marking them as deprecated and create new ones to
> replace them?

Deprecating an API is just a more elegant way of replacing an API with a
new one. The only difference between the two is whether the old API is kept
and coexists with the new API for some releases or not. For end users,
deprecation-then-remove is much more friendly than direct replacement.

1. How to make sure the new APIs cover all functionality, i.e. backward
> compatible, before removing the deprecated APIs? Since the
> functionalities could only be built with the new APIs iteratively, there
> will be a while (might be longer than the migration period) that the new
> APIs are not backward compatible with the deprecated ones.

This is orthogonal to the deprecation process, and may not even be required
in some cases if the function changes by itself. But in general, this
relies on the developer to decide. A simple test on readiness is to see if
all the UT / IT cases written with the old API can be migrated to the new
one and still work.  If the new API is not ready, we probably should not
deprecate the old one to begin with.

2. Is it allowed to remove the deprecated APIs after the defined migration
> period expires while the new APis are still not backward compatible?

By "backwards compatible", do you mean functionally equivalent? If the new
APIs are designed to be not backwards compatible, then removing the
deprecated source code is definitely allowed. If we don't think the new API
is ready to take over the place for the old one, then we should wait. The
migration period is the minimum time we have to wait before removing the
source code. A longer migration period is OK.

3. For the case of core API upgrade with downstream implementations, e.g.
> connectors, What is the feasible deprecation strategy? Option1 bottom-up:
> make sure the downstream implementation is backward compatible before
> removing the deprecated core APIs. Option2 top-down: once the downstream
> implementation of new APIs works fine, we can remove the deprecated core
> APIs after the migration period expires. The implementation of the
> deprecated APIs will not get any further update in upcoming releases(it has
> been removed). There might be some missing features in the downstream
> implementation of new APIs compared to the old implementation. Both options
> have their own pros and cons.

The downstream projects such as connectors in Flink should also follow the
migration path we tell our users. i.e. If there is a cascading backwards
incompatible change in the connectors due to a backwards incompatible
change in the core, and as a result a longer migration period is required,
then I think we should postpone the removal of source code. But in general,
we should be able to provide the same migration period in the connectors as
the flink core, if the connectors are upgraded to the latest version of
core promptly.

Thanks,

Jiangjie (Becket) Qin

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/deprecated-list.html


On Wed, Jun 14, 2023 at 1:15 AM Jing Ge  wrote:

> > This is by design. Most of these are @Public APIs that we had to carry
> > around until Flink 2.0, because that was the initial guarantee that we
> > gave people.
> >
>
> True, I knew @Public APIs could not be removed before the next major
> release. I meant house cleaning without violation of these annotations'
> design concept. i.e especially cleaning up for @PublicEvolving APIs since
> they are customer-facing. Regular cleaning up with all other @Experimental
> and @Internal APIs would be even better, if there might be some APIs marked
> as @deprecated.
>
> Best regards,
> Jing
>
>
>
> On Tue, Jun 13, 2023 at 4:25 PM Chesnay Schepler 
> wrote:
>
> > On 13/06/2023 12:50, Jing Ge wrote:
> > > One major issue we have, afaiu, is caused by the lack of
> > housekeeping/house
> > > cleaning, there are many APIs that were marked as deprecated a few
> years
> > > ago and still don't get removed. Some APIs should be easy to 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-14 Thread Lincoln Lee
Hi Aitozi,

Sorry for the lately reply here!  Supports async udtf(`AsyncTableFunction`)
directly in sql seems like an attractive feature, but there're two issues
that need to be addressed before we can be sure to add it:
1. As mentioned in the flip[1], the current lookup function can already
implement the requirements, but it requires implementing an extra
`LookupTableSource` and explicitly declaring the table schema (which can
help implementers the various push-down optimizations supported by the
planner). Does the async udtf bring any additional benefits besides a
lighter implementation?
2. FLIP-221[2] abstracts a reusable cache and metric infrastructure for
lookup sources, which are important to improve performance and
observability for high overhead external io scenarios, how do we integrate
and reuse these capabilities after introducing async udtf? Should users
migrate to the lookup source when they encounter similar requirements or
problems, or should we develop an additional set of similar mechanisms? (a
similarly case:  FLIP-234[3] introduced the retryable capability for lookup
join)

For the flip itself,
1. Considering the 'options' is already used as the dynamic table
options[4] in flink, the newly added query hint need a different name that
can be easier related to the lateral operation as the current join hints[5]
do.
2. For the async func example, since the target scenario is an external io
operation, it's better to add the `close` method to actively release
resources as a good example for users. Also in terms of the determinism of
a function, it is important to remind users that unless the behavior of the
function is deterministic, it needs to be explicitly declared as
non-deterministic.

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction?src=contextnavpagetreemode
[2].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric?src=contextnavpagetreemode
[3].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems?src=contextnavpagetreemode
[4].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL?src=contextnavpagetreemode
[5].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job?src=contextnavpagetreemode

Best,
Lincoln Lee


Aitozi  于2023年6月13日周二 11:30写道:

> Get your meaning now, thanks :)
>
> Best,
> Aitozi.
>
> Feng Jin  于2023年6月13日周二 11:16写道:
>
> > Hi Aitozi,
> >
> > Sorry for the confusing description.
> >
> > What I meant was that if we need to remind users about tire safety
> issues,
> > we should introduce the new UDTF interface instead of executing the
> > original UDTF asynchronously. Therefore, I agree with introducing the
> > AsyncTableFunction.
> >
> > Best,
> > Feng
> >
> > On Tue, Jun 13, 2023 at 10:42 AM Aitozi  wrote:
> >
> > > Hi Feng,
> > > Thanks for your question. We do not provide a way to switch the
> UDTF
> > > between sync and async way,
> > > So there should be no thread safety problem here.
> > >
> > > Best,
> > > Aitozi
> > >
> > > Feng Jin  于2023年6月13日周二 10:31写道:
> > >
> > > > Hi Aitozi, We do need to remind users about thread safety issues.
> Thank
> > > you
> > > > for your efforts on this FLIP. I have no further questions.
> > > > Best, Feng
> > > >
> > > >
> > > > On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> > > > wrote:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for taking care of that part. I have no other concern.
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > >
> > > > > On Mon, Jun 12, 2023 at 5:38 PM Aitozi 
> wrote:
> > > > >
> > > > > > BTW, If there are no other more blocking issue / comments, I
> would
> > > like
> > > > > to
> > > > > > start a VOTE in another thread this wednesday 6.14
> > > > > >
> > > > > > Thanks,
> > > > > > Aitozi.
> > > > > >
> > > > > > Aitozi  于2023年6月12日周一 23:34写道:
> > > > > >
> > > > > > > Hi, Jing,
> > > > > > > Thanks for your explanation. I get your point now.
> > > > > > >
> > > > > > > For the performance part, I think it's a good idea to run with
> > > > > returning
> > > > > > a
> > > > > > > big table case, the memory consumption
> > > > > > > should be a point to be taken care about. Because in the
> ordered
> > > > mode,
> > > > > > the
> > > > > > > head element in buffer may affect the
> > > > > > > total memory consumption.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Aitozi.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > > > > >
> > > > > > >> Hi Aitozi,
> > > > > > >>
> > > > > > >> Which key will be used for lookup is not an issue, only one
> row
> > > will
> > > > > be
> > > > > > >> required for each key in order to enrich it. True, it depends
> on
> > > the
> > > > > > >> implementation whether