Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-05 Thread Weihua Hu
Congratulations!

Best,
Weihua


On Wed, Jul 5, 2023 at 5:46 PM Shammon FY  wrote:

> Congratulations!
>
> Best,
> Shammon FY
>
> On Wed, Jul 5, 2023 at 2:38 PM Paul Lam  wrote:
>
> > Congrats and cheers!
> >
> > Best,
> > Paul Lam
> >
> > > 2023年7月4日 18:04,Benchao Li  写道:
> > >
> > > Congratulations!
> > >
> > > Feng Jin  于2023年7月4日周二 16:17写道:
> > >
> > >> Congratulations!
> > >>
> > >> Best,
> > >> Feng Jin
> > >>
> > >>
> > >>
> > >> On Tue, Jul 4, 2023 at 4:13 PM Yuxin Tan 
> > wrote:
> > >>
> > >>> Congratulations!
> > >>>
> > >>> Best,
> > >>> Yuxin
> > >>>
> > >>>
> > >>> Dunn Bangui  于2023年7月4日周二 16:04写道:
> > >>>
> >  Congratulations!
> > 
> >  Best,
> >  Bangui Dunn
> > 
> >  Yangze Guo  于2023年7月4日周二 15:59写道:
> > 
> > > Congrats everyone!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jul 4, 2023 at 3:53 PM Rui Fan <1996fan...@gmail.com>
> wrote:
> > >>
> > >> Congratulations!
> > >>
> > >> Best,
> > >> Rui Fan
> > >>
> > >> On Tue, Jul 4, 2023 at 2:08 PM Zhu Zhu  wrote:
> > >>
> > >>> Congratulations everyone!
> > >>>
> > >>> Thanks,
> > >>> Zhu
> > >>>
> > >>> Hang Ruan  于2023年7月4日周二 14:06写道:
> > 
> >  Congratulations!
> > 
> >  Best,
> >  Hang
> > 
> >  Jingsong Li  于2023年7月4日周二 13:47写道:
> > 
> > > Congratulations!
> > >
> > > Thank you! All of the Flink community!
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jul 4, 2023 at 1:24 PM tison 
> >  wrote:
> > >>
> > >> Congrats and with honor :D
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> Mang Zhang  于2023年7月4日周二 11:08写道:
> > >>
> > >>> Congratulations!--
> > >>>
> > >>> Best regards,
> > >>> Mang Zhang
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> 在 2023-07-04 01:53:46,"liu ron"  写道:
> >  Congrats everyone
> > 
> >  Best,
> >  Ron
> > 
> >  Jark Wu  于2023年7月3日周一 22:48写道:
> > 
> > > Congrats everyone!
> > >
> > > Best,
> > > Jark
> > >
> > >> 2023年7月3日 22:37,Yuval Itzchakov 
> > >>> 写道:
> > >>
> > >> Congrats team!
> > >>
> > >> On Mon, Jul 3, 2023, 17:28 Jing Ge via user <
> > > u...@flink.apache.org
> > > > wrote:
> > >>> Congratulations!
> > >>>
> > >>> Best regards,
> > >>> Jing
> > >>>
> > >>>
> > >>> On Mon, Jul 3, 2023 at 3:21 PM yuxia <
> > > luoyu...@alumni.sjtu.edu.cn
> > > > wrote:
> >  Congratulations!
> > 
> >  Best regards,
> >  Yuxia
> > 
> >  发件人: "Pushpa Ramakrishnan" <
> > > pushpa.ramakrish...@icloud.com
> > >  > > pushpa.ramakrish...@icloud.com>>
> >  收件人: "Xintong Song"  > >>  > > tonysong...@gmail.com>>
> >  抄送: "dev"  > >>> dev@flink.apache.org>>,
> > > "User"  >  u...@flink.apache.org
> > >>>
> >  发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
> >  主题: Re: [ANNOUNCE] Apache Flink has won the 2023
> >  SIGMOD
> > >>> Systems
> > >>> Award
> > 
> >  Congratulations \uD83E\uDD73
> > 
> >  On 03-Jul-2023, at 3:30 PM, Xintong Song <
> > >>> tonysong...@gmail.com
> > > > wrote:
> > 
> >  
> >  Dear Community,
> > 
> >  I'm pleased to share this good news with everyone.
> > >>> As
> > > some
> > >>> of
> > > you
> > >>> may
> > > have already heard, Apache Flink has won the 2023
> > >> SIGMOD
> > > Systems
> > > Award
> > >>> [1].
> > 
> >  "Apache Flink greatly expanded the use of stream
> > > data-processing."
> > >>> --
> > > SIGMOD Awards Committee
> > 
> >  SIGMOD is one of the most influential data
> > >>> management
> > >>> research
> > > conferences in the world. The Systems Award is awarded
> > >>> to
> >  an
> > > individual
> > >>> or
> > > set of individuals to recognize the development of a
> > > software or
> > >>> hardware
> > > system whose technical contributions have had
> > >>> significant

[jira] [Created] (FLINK-32547) Add missing doc for Timestamp support in ProtoBuf format

2023-07-05 Thread Benchao Li (Jira)
Benchao Li created FLINK-32547:
--

 Summary: Add missing doc for Timestamp support in ProtoBuf format
 Key: FLINK-32547
 URL: https://issues.apache.org/jira/browse/FLINK-32547
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.1
Reporter: Benchao Li


In FLINK-30093, we have support {{Timestamp}} type, and added the doc for it, 
but missed to updating the English version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Feature requests for Flink protobuf deserialization

2023-07-05 Thread Benchao Li
Thanks for starting the discussion,

1. I'm +1 for this.
2. We have already supported this in [1]
3. I'm not sure about this, could you give more examples except the cases
1&2?
4&5. I think we also have considered this with the option
'protobuf.read-default-values' [2], is this what you want?

[1] https://issues.apache.org/jira/browse/FLINK-30093
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#protobuf-read-default-values



Adam Richardson  于2023年6月28日周三 10:16写道:

> Hi there,
>
> My company is in the process of rebuilding some of our batch Spark-based
> ETL pipelines in Flink. We use protobuf to define our schemas. One major
> challenge is that Flink protobuf deserialization has some semantic
> differences with the ScalaPB encoders we use in our Spark systems. This
> poses a serious barrier for adoption as moving any given dataset from Spark
> to Flink will potentially break all downstream consumers. I have a long
> list of feature requests in this area:
>
>1. Support for mapping protobuf optional wrapper types (StringValue,
>IntValue, and friends) to nullable primitive types rather than RowTypes
>2. Support for mapping the protobuf Timestamp type to a real timestamp
>rather than RowType
>3. A way of defining custom mappings from specific proto types to custom
>Flink types (the previous two feature requests could be implemented on
> top
>of this lower-level feature)
>4. Support for nullability semantics for message types (in the status
>quo, an unset message is treated as equivalent to a message with default
>values for all fields, which is a confusing user experience)
>5. Support for nullability semantics for primitives types (in many of
>our services, the default value for a field of primitive type is
> treated as
>being equivalent to unset or null, so it would be good to offer this as
> a
>capability in the data warehouse)
>
> Would Flink accept patches for any or all of these feature requests? We're
> contemplating forking flink-protobuf internally, but it would be better if
> we could just upstream the relevant changes. (To my mind, 1, 2, and 4 are
> broadly applicable features that are definitely worthy of upstream support.
> 3 and 5 may be somewhat more specific to our use case.)
>
> Thanks,
> Adam Richardson
>


-- 

Best,
Benchao Li


Re: [DISCUSS] Flink and Externalized connectors leads to block and circular dependency problems

2023-07-05 Thread Dian Fu
Hi Chesnay,

>> The wrapping of connectors is a bit of a maintenance nightmare and
doesn't really work with external/custom connectors.

Cannot agree with you more.

>> Has there ever been thoughts about changing flink-pythons connector
setup to use the table api connectors underneath?

I'm still not sure if this is feasible for all connectors, however,
this may be a good idea. The concern is that the DataStream API
connectors functionalities may be unaligned between Java and Python
connectors. Besides, there are still a few connectors which only have
DataStream API connectors, e.g. Google PubSub, RabbitMQ, Cassandra,
Pulsar, Hybrid Source, etc. Besides, it currently already supports
Table API connectors in PyFlink and if we take this way, maybe we
could just tell users to use Table API connector directly.

Another option in my head before is to provide an API which allows
configuring the behavior via key/value pairs in both the Java & Python
DataStream API connectors.

Regards,
Dian

On Wed, Jul 5, 2023 at 6:34 PM Chesnay Schepler  wrote:
>
> Has there ever been thoughts about changing flink-pythons connector
> setup to use the table api connectors underneath?
>
> The wrapping of connectors is a bit of a maintenance nightmare and
> doesn't really work with external/custom connectors.
>
> On 04/07/2023 13:35, Dian Fu wrote:
> > Thanks Ran Tao for proposing this discussion and Martijn for sharing
> > the thought.
> >
> >>   While flink-python now fails the CI, it shouldn't actually depend on the
> > externalized connectors. I'm not sure what PyFlink does with it, but if
> > belongs to the connector code,
> >
> > For each DataStream connector, there is a corresponding Python wrapper
> > and also some test cases in PyFlink. In theory, we should move that
> > wrapper into each connector repository. In the past, we have not done
> > that when externalizing the connectors since it may introduce some
> > burden when releasing since it means that we have to publish each
> > connector to PyPI separately.
> >
> > To resolve this problem, I guess we can move the connector support in
> > PyFlink into the external connector repository.
> >
> > Regards,
> > Dian
> >
> >
> > On Mon, Jul 3, 2023 at 11:08 PM Ran Tao  wrote:
> >> @Martijn
> >> thanks for clear explanations.
> >>
> >> If we follow the line you specified (Connectors shouldn't rely on
> >> dependencies that may or may not be
> >> available in Flink itself)
> >> It seems that we should add a certain dependency if we need(such as
> >> commons-io, commons-collection) in connector pom explicitly.
> >> And bundle it in sql-connector uber jar.
> >>
> >> Then there is only one thing left that we need to make flink-python test
> >> not depend on the released flink-connector.
> >> Maybe we should check it out and decouple it like you suggested.
> >>
> >> Best Regards,
> >> Ran Tao
> >> https://github.com/chucheng92
> >>
> >>
> >> Martijn Visser  于2023年7月3日周一 22:06写道:
> >>
> >>> Hi Ran Tao,
> >>>
> >>> Thanks for opening this topic. I think there's a couple of things at hand:
> >>> 1. Connectors shouldn't rely on dependencies that may or may not be
> >>> available in Flink itself, like we've seen with flink-shaded. That avoids 
> >>> a
> >>> tight coupling between Flink and connectors, which is exactly what we try
> >>> to avoid.
> >>> 2. When following that line, that would also be applicable for things like
> >>> commons-collections and commons-io. If a connector wants to use them, it
> >>> should make sure that it bundles those artifacts itself.
> >>> 3. While flink-python now fails the CI, it shouldn't actually depend on 
> >>> the
> >>> externalized connectors. I'm not sure what PyFlink does with it, but if
> >>> belongs to the connector code, that code should also be moved to the
> >>> individual connector repo. If it's just a generic test, we could consider
> >>> creating a generic test against released connector versions to determine
> >>> compatibility.
> >>>
> >>> I'm curious about the opinions of others as well.
> >>>
> >>> Best regards,
> >>>
> >>> Martijn
> >>>
> >>> On Mon, Jul 3, 2023 at 3:37 PM Ran Tao  wrote:
> >>>
>  I have an issue here that needs to upgrade commons-collections[1] (this
> >>> is
>  an example), but PR ci fails because flink-python test cases depend on
>  flink-sql-connector-kafka, but kafka-sql-connector is a small jar, does
> >>> not
>  include this dependency, so flink ci cause exception[2]. Current my
>  solution is [3]. But even if this PR is done, the upgrade of flink still
>  requires kafka-connector released.
> 
>  This issue leads to deeper problems. Although the connectors have been
>  externalized, many UTs of flink-python depend on these connectors, and a
>  basic agreement of externalized connectors is that other dependencies
>  cannot be introduced explicitly, which means the externalized connectors
>  use dependencies inherited from flink. In this way, when flink main
> 

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-05 Thread Shammon FY
Hi,

Thanks for your replay @Dong. I really agree with Piotr's points and I
would like to share some thoughts from my side.

About the latency for mini-batch mechanism in Flink SQL, I still think the
description in the FLIP is not right. If there are N operators and the
whole process time for data in the job is `t`, then the latency in
mini-batch will be `table.exec.mini-batch.allow-latency`+`t`, not `
table.exec.mini-batch.allow-latency`*N. I think this is one of the
foundations of this FLIP, and you may need to confirm it again.

I think supporting similar mechanisms in the runtime and balance latency
and throughput dynamically for all flink jobs is a very good idea, and I
have some questions for that.

1. We encounter a situation where the workload is high when processing
snapshot data and we need mini-batch in sql for performance reason. But the
workload is low when processing delta data, we need to automatically adjust
the mini-batch SQL for them, or even cancel the mini-batch during delta
processing. I think this FLIP meets our needs, but I think we need a
general solution which covers all source types in flink, and the
`isBacklog` in the FLIP is only one strategy.
>From the FLIP I think there should be two parts: dynamic trigger flush
event in JM and dynamic trigger flush operations in Operator. We need to
introduce much more general interfaces for them, such as
`DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM? As Piotr
mentioned above, we can collect many information from TM locally such as
backpressure, queue size and `Operator` can decide whether to buffer data
or process it immediately.  JM is also the same, it can decide to send
flush events on a regular basis or send them based on the collected metrics
information and other information, such as the isBacklog in the FLIP.

2. I really don't get enough benefits for `RecordAttribute` in the FLIP and
as Piotr mentioned above too, it will generate a large number of messages,
affecting performance. FLIP mentions that it will be applied to Operator
and Sink, I try to understand it's role and please correct me if I'm wrong.
a) It tells the Operator and Sink that current most of data they are
processing are from snapshot and are "insert" data? For the out of order in
flink, the Operator and Sink may receive "upsert" data from other sources.
b) Do Operators and Sink perform any very special operations in the above
situations? What are the benefits of this special operations for "most data
are insert"?
c) I think the operator and sink can collect the above information locally
when it receives each record without the need for `RecordAttribute` even
when we need some special operations.
d) Even if we do need a `RecordAttribute` events in Operator and Sink, I
think broadcast them from JM is a better choice.

3. For the flush event, I also have some questions. What type of operators
need to buffer data and flush them based on the flush events? In SQL
mini-batch mechanism, similar processing will be added for the aggregate
and join operators, while for operators such as map, it is not necessary.
How can we identify different operator in the runtime layer (`Input` and
`TwoInputStreamOperator`)? I think buffer data in Map/FlatMap/Filter
Operator is not a good idea which makes data no longer flowing.


Best,
Shammon FY


On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski 
wrote:

> Hi,
>
> Thanks for this proposal, this is a very much needed thing that should be
> addressed in Flink.
>
> I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> three dimensions:
> - e2e latency/checkpointing interval
> - enabling some kind of batching/buffering on the operator level
> - how much resources we want to allocate to the job
>
> How do we want Flink to adjust itself between those three? For example:
> a) Should we assume that given Job has a fixed amount of assigned
> resources and make it paramount that
>   Flink doesn't exceed those available resources? So in case of
> backpressure, we
>   should extend checkpointing intervals, emit records less frequently and
> in batches.
> b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
>   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
>   and wait for the user or autoscaler to scale up the job?
>
> In case of a), I think the concept of "isProcessingBacklog" is not needed,
> we could steer the behaviour only
> using the backpressure information.
>
> On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> we can safely decrease the e2e latency/checkpoint interval even if there
> is no backpressure, to use fewer
> resources (and let the autoscaler scale down the job).
>
> Do we want to have both, or only one of those? Do a) and b) complement one
> another? If job is backpressured,
> we should 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Dong Lin
Hi Piotr,

I am sorry if you feel unhappy or upset with us for not following/fixing
your proposal. It is not my intention to give you this feeling. After all,
we are all trying to make Flink better, to support more use-case with the
most maintainable code. I hope you understand that just like you, I have
also been doing my best to think through various design options and taking
time to evalute the pros/cons. Eventually, we probably still need to reach
consensus by clearly listing and comparing the objective pros/cons of
different proposals and identifying the best choice.

Regarding your concern (or frustration) that we are always finding issues
in your proposal, I would say it is normal (and probably necessary) for
developers to find pros/cons in each other's solutions, so that we can
eventually pick the right one. I will appreciate anyone who can correctly
pinpoint the concrete issue in my proposal so that I can improve it or
choose an alternative solution.

Regarding your concern that we are not spending enough effort to find
solutions and that the problem in your solution can be solved in a minute,
I would like to say that is not true. For each of your previous proposals,
I typically spent 1+ hours thinking through your proposal to understand
whether it works and why it does not work, and another 1+ hour to write
down the details and explain why it does not work. And I have had a variety
of offline discussions with my colleagues discussing various proposals
(including yours) with 6+ hours in total. Maybe I am not capable enough to
fix those issues in one minute or so so. If you think your proposal can be
easily fixed in one minute or so, I would really appreciate it if you can
think through your proposal and fix it in the first place :)

For your information, I have had several long discussions with my
colleagues at Alibaba and also Becket on this FLIP. We have seriously
considered your proposals and discussed in detail what are the pros/cons
and whether we can improve these solutions. The initial version of this
FLIP (which allows the source operator to specify checkpoint intervals)
does not get enough support due to concerns of not being generic (i.e.
users need to specify checkpoint intervals on a per-source basis). It is
only after I updated the FLIP to use the job-level
execution.checkpointing.interval-during-backlog, then they agree to give +1
to the FLIP. What I want to tell you is that your suggestions have been
taken seriously, and the quality of the FLIP has been taken seriously
by all those who have voted. As a result of taking your suggestion
seriously and trying to find improvements, we updated the FLIP to use
isProcessingBacklog.

I am wondering, do you think it will be useful to discuss face-to-face via
video conference call? It is not just between you and me. We can invite the
developers who are interested to join and help with the discussion. That
might improve communication efficiency and help us understand each other
better :)

I am writing this long email to hopefully get your understanding. I care
much more about the quality of the eventual solution rather than who
proposed the solution. Please bear with me and see my comments inline, with
an explanation of the pros/cons of these proposals.


On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski 
wrote:

> Hi Guys,
>
> I would like to ask you again, to spend a bit more effort on trying to find
> solutions, not just pointing out problems. For 1.5 months,
> the discussion doesn't go in circle, but I'm suggesting a solution, you are
> trying to undermine it with some arguments, I'm coming
> back with a fix, often an extremely easy one, only for you to try to find
> yet another "issue". It doesn't bode well, if you are finding
> a "problem" that can be solved with a minute or so of thinking or even has
> already been solved.
>
> I have provided you so far with at least three distinct solutions that
> could address your exact target use-case. Two [1][2] generic
> enough to be probably good enough for the foreseeable future, one
> intermediate and not generic [3] but which wouldn't
> require @Public API changes or some custom hidden interfaces.


> All in all:
> - [1] with added metric hints like "isProcessingBacklog" solves your target
> use case pretty well. Downside is having to improve
>   how JM is collecting/aggregating metrics
>

Here is my analysis of this proposal compared to the current approach in
the FLIP-309.

pros:
- No need to add the public API
SplitEnumeratorContext#setIsProcessingBacklog.
cons:
- Need to add a public API that subclasses of SourceReader can use to
specify its IsProcessingBacklog metric value.
- Source Coordinator needs to periodically pull the isProcessingBacklog
metrics from all TMs throughout the job execution.

Here is why I think the cons outweigh the pros:
1) JM needs to collect/aggregate metrics with extra runtime overhead, which
is not necessary for the target use-case with the push-based approach in

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-05 Thread Jing Ge
Hi Xuannan, Hi Dong,

Thanks for your clarification.

@Xuannan

A Jira ticket has been created for the doc update:
https://issues.apache.org/jira/browse/FLINK-32546

@Dong

I don't have a concrete example. I just thought about it from a conceptual
or pattern's perspective. Since we have 1. coarse-grained global switch(CGS
as abbreviation), i.e. the pipeline.object-reuse and 2. fine-grained local
switch(FGS as abbreviation), i.e. the objectReuseCompliant variable for
specific operators/functions, there will be the following patterns with
appropriate combinations:

pattern 1: coarse-grained switch only. Local object reuse will be
controlled by the coarse-grained switch:
1.1 cgs == true -> local object reused enabled
1.2 cgs == true  -> local object reused enabled
1.3 cgs == false -> local object reused disabled, i.e. deep copy enabled
1.4 cgs == false -> local object reused disabled, i.e. deep copy enabled

afaiu, this is the starting point. I wrote 4 on purpose to make the
regression check easier. We can consider it as the combinations with
cgs(true/false) and fgs(true/false) while fgs is ignored.

Now we introduce fine-grained switch. There will be two patterns:

pattern 2: fine-grained switch over coarse-grained switch. Coarse-grained
switch will be ignored when the local fine-grained switch has different
value:
2.1 cgs == true and fgs == true -> local object reused enabled
2.2 cgs == true and fgs == false -> local object reused disabled, i.e. deep
copy enabled
2.3 cgs == false and fgs == true -> local object reused enabled
2.4 cgs == false and fgs == false -> local object reused disabled, i.e.
deep copy enabled

cgs is actually ignored.

Current FLIP is using a slightly different pattern:

pattern 3: fine-grained switch over coarse-grained switch only when
coarse-grained switch is off, i..e cgs OR fgs:
3.1 cgs == true and fgs == true -> local object reused enabled
3.2 cgs == true and fgs == false -> local object reused enabled
3.3 cgs == false and fgs == true -> local object reused enabled
3.4 cgs == false and fgs == false -> local object reused disabled, i.e.
deep copy enabled

All of those patterns are rational and each has different focus. It depends
on the real requirement to choose one of them.

As we can see, if fgs is using 2VL, there is a regression between pattern 1
and pattern 2. You are absolutely right in this case. That's why I
suggested 3VL, i.e. fgs will have triple values: true, false, unknown(e.g.
null)

pattern 4: 3VL fgs with the null as init value (again, there are just two
combination, I made it 4 on purpose):
4.1 cgs == true and fgs == null -> local object reused enabled
4.2 cgs == true and fgs == null -> local object reused enabled
4.3 cgs == false and fgs == null -> local object reused disabled, i.e. deep
copy enabled
4.4 cgs == false and fgs == null -> local object reused disabled, i.e. deep
copy enabled

Since the default value of fgs is null, pattern 4 is backward compatible
with pattern 1, which means no regression.

Now we will set value to fgs and follow the pattern 2:
4.5 cgs == true and fgs == true -> local object reused enabled
4.6 cgs == true and fgs == false -> local object reused disabled, i.e. deep
copy enabled
4.7 cgs == false and fgs == true -> local object reused enabled
4.8 cgs == false and fgs == false -> local object reused disabled, i.e.
deep copy enabled

Pattern 4 contains pattern 3 with the following combinations(force enabling
local object reuse):
4.5 cgs == true and fgs == true -> local object reused enabled
4.2 cgs == true and fgs == null -> local object reused enabled
4.7 cgs == false and fgs == true -> local object reused enabled
4.4 cgs == false and fgs == null -> local object reused disabled, i.e. deep
copy enabled

Comparing pattern 4 to pattern 3, user will have one additional flexibility
to control(force disabling) the local object reuse capability because of
3VL, i.e. 4.2+4.6 vs. 3.2.

It is commonly used in the hierarchical RBAC to enable more fine-grained
access control of sub role.

I hope I have been able to explain myself clearly. Looking forward to your
feedback.

Best regards,
Jing



On Wed, Jul 5, 2023 at 12:47 PM Dong Lin  wrote:

> Hi Jing,
>
> Thanks for the comments! Please find below my comments, which are based on
> the offline discussion with Xuannan.
>
> On Wed, Jul 5, 2023 at 1:36 AM Jing Ge  wrote:
>
>> Hi Xuannan, Hi Dong
>>
>> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
>> questions:
>>
>> 1. Naming convention for boolean variables. It is recommended to follow
>> JavaBean [1], i.e. objectReuseCompliant as the variable name with
>> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>>
>>
> Good point. We have updated the FLIP as suggested.
>
>
>>
>> 2.
>>
>>-
>>
>>*If pipeline.object-reuse is set to true, records emitted by this
>>operator will be re-used.*
>>-
>>
>>*Otherwise, if getIsObjectReuseCompliant() returns true, records
>>emitted by this 

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-05 Thread Piotr Nowojski
Hi,

Thanks for this proposal, this is a very much needed thing that should be
addressed in Flink.

I think there is one thing that hasn't been discussed neither here nor in
FLIP-309. Given that we have
three dimensions:
- e2e latency/checkpointing interval
- enabling some kind of batching/buffering on the operator level
- how much resources we want to allocate to the job

How do we want Flink to adjust itself between those three? For example:
a) Should we assume that given Job has a fixed amount of assigned resources
and make it paramount that
  Flink doesn't exceed those available resources? So in case of
backpressure, we
  should extend checkpointing intervals, emit records less frequently and
in batches.
b) Or should we assume that the amount of resources is flexible (up to a
point?), and the desired e2e latency
  is the paramount aspect? So in case of backpressure, we should still
adhere to the configured e2e latency,
  and wait for the user or autoscaler to scale up the job?

In case of a), I think the concept of "isProcessingBacklog" is not needed,
we could steer the behaviour only
using the backpressure information.

On the other hand, in case of b), "isProcessingBacklog" information might
be helpful, to let Flink know that
we can safely decrease the e2e latency/checkpoint interval even if there is
no backpressure, to use fewer
resources (and let the autoscaler scale down the job).

Do we want to have both, or only one of those? Do a) and b) complement one
another? If job is backpressured,
we should follow a) and expose to autoscaler/users information "Hey! I'm
barely keeping up! I need more resources!".
While, when there is no backpressure and latency doesn't matter
(isProcessingBacklog=true), we can limit the resource
usage.

And a couple of more concrete remarks about the current proposal.

1.

> I think the goal is to allow users to specify an end-to-end latency
budget for the job.

I fully agree with this, but in that case, why are you proposing to add
`execution.flush.interval`? That's
yet another parameter that would affect e2e latency, without actually
defining it. We already have things
like: execution.checkpointing.interval, execution.buffer-timeout. I'm
pretty sure very few Flink users would be
able to configure or understand all of them.

I think we should simplify configuration and try to define
"execution.end-to-end-latency" so the runtime
could derive other things from this new configuration.

2. How do you envision `#flush()` and `#snapshotState()` to be connected?
So far, `#snapshotState()`
was considered as a kind of `#flush()` signal. Do we need both? Shouldn't
`#flush()` be implicitly or
explicitly attached to the `#snapshotState()` call?

3. What about unaligned checkpoints if we have separate `#flush()`
event/signal?

4. How should this be working in at-least-once mode (especially sources
that are configured to be working
in at-least-once mode)?.

5. How is this FLIP connected with FLI-327? I think they are trying to
achieve basically the same thing:
optimise when data should be flushed/committed to balance between
throughput and latency.

6.

> Add RecordAttributesBuilder and RecordAttributes that extends
StreamElement to provide operator with essential
> information about the records they receive, such as whether the records
are already stale due to backlog.

Passing along `RecordAttribute` for every `StreamElement` would be an
extremely inefficient solution.

If at all, this should be a marker propagated through the JobGraph vie
Events or sent from JM to TMs via an RPC
that would mark "backlog processing started/ended". Note that Events might
be costly, as they need to be
broadcasted. So with a job having 5 keyBy exchanges and parallelism of
1000, the number of events sent is
~4 000 000, while the number of RPCs would be only 5000.

In case we want to only check for the backpressure, we don't need any extra
signal. Operators/subtasks can
get that information very easily from the TMs runtime.

Best,
Piotrek

czw., 29 cze 2023 o 17:19 Dong Lin  napisał(a):

> Hi Shammon,
>
> Thanks for your comments. Please see my reply inline.
>
> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY  wrote:
>
> > Hi Dong and Yunfeng,
> >
> > Thanks for bringing up this discussion.
> >
> > As described in the FLIP, the differences between `end-to-end latency`
> and
> > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
> the
> > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies
> to
> > each operator. If there are N operators on the path from source to sink,
> > the end-to-end latency could be up to
> table.exec.mini-batch.allow-latency *
> > N".
> >
> > If I understand correctly, `table.exec.mini-batch.allow-latency` is also
> > applied to the end-to-end latency for a job, maybe @Jack Wu can give more
> > information.
> >
>
> Based on what I can tell from the doc/code and offline discussion, I
> believe table.exec.mini-batch.allow-latency is not applied to 

[jira] [Created] (FLINK-32546) update Code Style Guide with Java properties naming convention

2023-07-05 Thread Jing Ge (Jira)
Jing Ge created FLINK-32546:
---

 Summary: update Code Style Guide with Java properties naming 
convention
 Key: FLINK-32546
 URL: https://issues.apache.org/jira/browse/FLINK-32546
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Jing Ge


The class [properties|https://en.wikipedia.org/wiki/Property_(programming)] 
must be accessible using {_}get{_}, {_}set{_}, _is_ (can be used for boolean 
properties instead of get), _to_ and other methods (so-called [accessor 
methods|https://en.wikipedia.org/wiki/Accessor] and [mutator 
methods|https://en.wikipedia.org/wiki/Mutator_method]) according to a standard 
[naming 
convention|https://en.wikipedia.org/wiki/Naming_conventions_(programming)]. 

 

[https://en.wikipedia.org/wiki/JavaBeans]

[https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISUCSS] Deprecate multiple APIs in 1.18

2023-07-05 Thread Jing Ge
Hi Alex,


> > 3. remove SinkFunction.
> Which steps do you imply for the 1.18 release and for the 2.0 release?
>

for 2.0 release. 1.18 will be released soon.

Best regards,
Jing


On Wed, Jul 5, 2023 at 1:08 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> @Jing
> Just to clarify, when you say:
>
> 3. remove SinkFunction.
> Which steps do you imply for the 1.18 release and for the 2.0 release?

@Xintong
> A side note - with the new Source API we lose the ability to control
> checkpointing from the source since there is no lock anymore. This
> functionality
> is currently used in a variety of tests for the Sinks - the tests that rely
> on tight
> synchronization between specific elements passed from the source  to the
> sink before
> allowing a checkpoint to complete (see FiniteTestSource [1]). Since FLIP-27
> Sources rely
> on decoupling via the mailbox, without exposing the lock, it is not
> immediately clear
> if it is possible to achieve the same functionality without major
> extensions in the
> runtime for such testing purposes. My hope initially was that only the
> legacy Sinks
> relied on this - this would have made it possible to drop
> SourceFunction+SinkFunction
> together, but, in fact, it also already became part of the new SinkV2
> testing IT suits
> [2]. Moreover, I know of at least one major connector that also relies on
> it for
> verifying committed sink metadata for a specific set of records (Iceberg)
> [3]. In my
> estimation this currently presents a major blocker for the SourceFunction
> removal.
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
> [2]
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java#L132
> [3]
>
> https://github.com/apache/iceberg/blob/master/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L75C1-L85C2
>
> Best,
> Alex
>
> On Wed, 5 Jul 2023 at 10:47, Chesnay Schepler  wrote:
>
> > There's a whole bunch of metric APIs that would need to be deprecated.
> > That is of course if the metric FLIPs are being accepted.
> >
> > Which makes me wonder if we aren't doing things the wrong way around;
> > shouldn't the decision to deprecate an API be part of the FLIP
> discussion?
> >
> > On 05/07/2023 07:39, Xintong Song wrote:
> > > Thanks all for the discussion.
> > >
> > > It seems to me there's a consensus on marking the following as
> deprecated
> > > in 1.18:
> > > - DataSet API
> > > - SourceFunction
> > > - Queryable State
> > > - All Scala APIs
> > >
> > > More time is needed for deprecating SinkFunction.
> > >
> > > I'll leave this discussion open for a few more days. And if there's no
> > > objections, I'll create JIRA tickets accordingly.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Wed, Jul 5, 2023 at 1:34 PM Xintong Song 
> > wrote:
> > >
> > >> Thanks for the input, Jing. I'd also be +1 for option 1.
> > >>
> > >> Best,
> > >>
> > >> Xintong
> > >>
> > >>
> > >>
> > >> On Mon, Jul 3, 2023 at 7:20 PM Jing Ge 
> > wrote:
> > >>
> > >>> Hi Xingtong,
> > >>>
> > >>> Option 1, secure plan would be:
> > >>>
> > >>> 1. graduate kafka, File, JDBC connectors to @Public
> > >>> 2. graduate SinkV2 to @Public
> > >>> 3. remove SinkFunction.
> > >>>
> > >>> Option 2, risky plan but at a fast pace:
> > >>>
> > >>> 1. graduate SinkV2 to @Public and expecting more maintenance effort
> > since
> > >>> there are many known and unsolved issues.
> > >>> 2. remove SinkFunction.
> > >>> 3. It depends on the connectors' contributors whether connectors can
> > >>> upgrade to Flink 2.0, since we moved forward with SinkV2 API without
> > >>> taking
> > >>> care of implementations in external connectors.
> > >>>
> > >>> I am ok with both of them and personally prefer option 1.
> > >>>
> > >>> Best regards,
> > >>> Jing
> > >>>
> > >>>
> > >>> On Fri, Jun 30, 2023 at 3:41 AM Xintong Song 
> > >>> wrote:
> > >>>
> >  I see. Thanks for the explanation. I may have not looked into this
> > >>> deeply
> >  enough, and would trust the decision from you and the community
> > members
> > >>> who
> >  participated in the discussion & vote.
> > 
> >  Best,
> > 
> >  Xintong
> > 
> > 
> > 
> >  On Thu, Jun 29, 2023 at 10:28 PM Alexander Fedulov <
> >  alexander.fedu...@gmail.com> wrote:
> > 
> > >> However, I'm not sure about 2.
> > > I am not aware of a bylaw that states the specific requirements in
> > >>> order
> >  to
> > > mark something as @Deprecated. My understanding from the discussion
> > >>> and
> >  the
> > > vote was that the community recognizes the necessity to make it
> > >>> explicit
> > > that
> > > the usage of the SourceFunction API is discouraged. This can
> actually
> > > stimulate
> > 

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-05 Thread Piotr Nowojski
Hi Dong,

I have a couple of questions.

Could you explain why those properties

@Nullable private Boolean isOutputOnEOF = null;
@Nullable private Boolean isOutputOnCheckpoint = null;
@Nullable private Boolean isInternalSorterSupported = null;

must be `@Nullable`, instead of having the default value set to `false`?

Second question, have you thought about cases where someone is
either bootstrapping from a streaming source like Kafka
or simply trying to catch up after a long period of downtime in a purely
streaming job? Generally speaking a cases where
user doesn't care about latency in the catch up phase, regardless if the
source is bounded or unbounded, but wants to process
the data as fast as possible, and then switch dynamically to real time
processing?

Best,
Piotrek

niedz., 2 lip 2023 o 16:15 Dong Lin  napisał(a):

> Hi all,
>
> I am opening this thread to discuss FLIP-327: Support stream-batch unified
> operator to improve job throughput when processing backlog data. The design
> doc can be found at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> .
>
> This FLIP enables a Flink job to initially operate in batch mode, achieving
> high throughput while processing records that do not require low processing
> latency. Subsequently, the job can seamlessly transition to stream mode for
> processing real-time records with low latency. Importantly, the same state
> can be utilized before and after this mode switch, making it particularly
> valuable when users wish to bootstrap the job's state using historical
> data.
>
> We would greatly appreciate any comments or feedback you may have on this
> proposal.
>
> Cheers,
> Dong
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Piotr Nowojski
Hi Guys,

I would like to ask you again, to spend a bit more effort on trying to find
solutions, not just pointing out problems. For 1.5 months,
the discussion doesn't go in circle, but I'm suggesting a solution, you are
trying to undermine it with some arguments, I'm coming
back with a fix, often an extremely easy one, only for you to try to find
yet another "issue". It doesn't bode well, if you are finding
a "problem" that can be solved with a minute or so of thinking or even has
already been solved.

I have provided you so far with at least three distinct solutions that
could address your exact target use-case. Two [1][2] generic
enough to be probably good enough for the foreseeable future, one
intermediate and not generic [3] but which wouldn't
require @Public API changes or some custom hidden interfaces.

All in all:
- [1] with added metric hints like "isProcessingBacklog" solves your target
use case pretty well. Downside is having to improve
  how JM is collecting/aggregating metrics
- [2] is basically an equivalent of [1], replacing metrics with events. It
also is a superset of your proposal
- [3] yes, it's hacky, but it's a solution that could be thrown away once
we implement [1] or [2] . The only real theoretical
  downside is that it cannot control the long checkpoint exactly (short
checkpoint interval has to be a divisor of the long checkpoint
  interval, but I simply can not imagine a practical use where that would
be a blocker for a user. Please..., someone wanting to set
  short checkpoint interval to 3min and long to 7 minutes, and that someone
can not accept the long interval to be 9 minutes?
  And that's even ignoring the fact that if someone has an issue with the 3
minutes checkpoint interval, I can hardly think that merely
  doubling the interval to 7 minutes would significantly solve any problem
for that user.

Dong a long time ago you wrote:
> Sure. Then let's decide the final solution first.

Have you thought about that? Maybe I'm wrong but I don't remember you
describing in any of your proposals how they could be
extended in the future, to cover more generic cases. Regardless if you
either don't believe in the generic solution or struggle to
grasp it, if you can come back with something that can be easily extended
in the future, up to a point where one could implement
something similar to this backpressure detecting algorithm that I mentioned
many times before, I would be happy to discuss and
support it.

Hang, about your points 1. and 2., do you think those problems are
insurmountable and blockers for that counter proposal?

> 1. It is hard to find the error checkpoint.

No it's not, please take a look at what I exactly proposed and maybe at the
code.

> 2. (...) The failed checkpoint may make them think the job is unhealthy.

Please read again what I wrote in [3]. I'm mentioning there a solution for
this exact "problem".

About the necessity of the config value, I'm still not convinced that's
needed from the start, but yes we can add some config option
if you think otherwise. This option, if named properly, could be re-used in
the future for different solutions, so that's fine by me.

Best,
Piotrek

[1] Introduced in my very first e-mail from 23 maj 2023, 16:26, and refined
later with point "2." in my e-mail from 16 June 2023, 17:58
[2] Section "2. ===" in my e-mail from 30 June 2023, 16:34
[3] Section "3. ===" in my e-mail from 30 June 2023, 16:34

All times in CEST.

śr., 5 lip 2023 o 08:46 Hang Ruan  napisał(a):

> Hi, Piotr & Dong.
>
> Thanks for the discussion.
>
> IMO, I do not think the provided counter proposal is a good idea. There are
> some concerns from my side.
>
> 1. It is hard to find the error checkpoint.
> If there are other errors causing the checkpoint failure, we have to check
> every failed checkpoint to find it.
>
> 2. It is more confused for the users.
> Some users only know the feature, but don't know how we implement it. The
> failed checkpoint may make them think the job is unhealthy.
>
> 3. Users should be able to set the checkpoint interval for the new backlog
> state.
> I think it is better to provide a setting for users to change the
> checkpoint interval at the new backlog state. The hard-code interval(5x /
> 10x) is not flexible enough.
>
> Best,
> Hang
>
> Dong Lin  于2023年7月5日周三 07:33写道:
>
> > Hi Piotr,
> >
> > Any suggestion on how we can practically move forward to address the
> target
> > use-case?
> >
> > My understanding is that the current proposal does not have any
> > correctness/performance issues. And it allows the extension to support
> all
> > the extra use-case without having to throw away the proposed APIs.
> >
> > If you prefer to have a better solution with simpler APIs and yet same or
> > better correctness/performance for the target use-case, could you please
> > kindly explain its API design so that we can continue the discussion?
> >
> >
> > Best,
> > Dong
> >
> > On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  

[jira] [Created] (FLINK-32545) Removes the expensive Row operations like join

2023-07-05 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-32545:
-

 Summary: Removes the expensive Row operations like join
 Key: FLINK-32545
 URL: https://issues.apache.org/jira/browse/FLINK-32545
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Jiang Xin
 Fix For: ml-2.4.0


Currently, dozens of algorithms in Flink ML contain code like `Row.join(row, 
Row.of(...))` which is expensive. We should avoid creating Rows multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

2023-07-05 Thread Etienne Chauchot

Hi all,

Thanks David for your suggestions. Comments inline.

Le 04/07/2023 à 13:35, David Morávek a écrit :

waiting 2 min between 2 requirements push seems ok to me

This depends on the workload. Would you care if the cost of rescaling were
close to zero (which is for most out-of-the-box workloads)? In that case,
it would be desirable to rescale more frequently, for example, if TMs join
incrementally.

Creating a value that covers everything is impossible unless it's
self-tuning, so I'd prefer having a smooth experience for people trying
things out (just imagine doing a demo at the conference) and having them
opt-in for longer cooldowns.

The users still have the ability to lower the cooldown period for high 
workloads but we could definitely set a default value to a lower number. 
I agree to favo 
r lower 
numbers (for smooth rescale experience) and consider higher numbers (for 
high workloads) as exceptions. But we still need to agree on a suitable 
default for most cases: 30s ?

One idea to keep the timeouts lower while getting more balance would be
restarting the cooldown period when new resources or requirements are
received. This would also bring the cooldown's behavior closer to the
resource-stabilization timeout. Would that make sense?



you mean, if slots are received during the cooldown period instead of 
proposed behavior (A),  do behavior (B) ?


A. schedule a rescale at lastRescale + cooldown point in time

B. schedule a rescale at ** now ** + cooldown point in time

It looks fine to me. It is even better because it avoids having 2 
rescales scheduled at the same time if 2 slots change arrive during the 
same cooldown period.



Etienne





Depends on how you implement it. If you ignore all of shouldRescale, yes,

but you shouldn't do that in the first place.



I agree, this is not what I planned to implement.




That sounds great; let's go ahead and outline this in the FLIP.

Best,
D.


On Tue, Jul 4, 2023 at 12:30 PM Etienne Chauchot
wrote:


Hi all,

Thanks David for your feedback. My comments are inline

Le 04/07/2023 à 09:16, David Morávek a écrit :

They will struggle if they add new resources and nothing happens for 5

minutes.

The same applies if they start playing with FLIP-291 APIs. I'm wondering

if

the cooldown makes sense there since it was the user's deliberate choice

to

push new requirements. 樂


Sure, but remember that the initial rescale is always done immediately.
Only the time between 2 rescales is controlled by the cooldown period. I
don't see a user adding resources every 10s (your proposed default
value) or even with, let's say 2 min, waiting 2 min between 2
requirements push seems ok to me.



Best,
D.

On Tue, Jul 4, 2023 at 9:11 AM David Morávek   wrote:


The FLIP reads sane to me. I'm unsure about the default values, though;

5

minutes of wait time between rescales feels rather strict, and we should
rethink it to provide a better out-of-the-box experience.

I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
if they add new resources and nothing happens for 5 minutes. I'd suggest
defaulting to
*jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
defaults to 10s).


If users add resources, the re-scale will happen right away. It is only
for next additions that they will have to wait for the coolDown period
to end.

But anyway, we could lower the default value, I just took what Robert
suggested in the ticket.



I'm still struggling to grasp max internal (force rescale). Ignoring

`AdaptiveScheduler#shouldRescale()`

condition seems rather dangerous. Wouldn't a simple case where you add a
new TM and remove it before the max interval is reached (so there is
nothing to do) result in an unnecessary job restart?

With current behavior (on master) : adding the TM will result in
restarting if the number of slots added leads to job parallelism
increase of more than 2. Then removing it can have 2 consequences:
either it is removed before the resource-stabilisation timeout and there
will be no restart. Or it is removed after this timeout (the job is in
Running state) and it will entail another restart and parallelism decrease.

With the proposed behavior: what the scaling-interval.max will change is
only on the resource addition part: when the TM is added, if the time
since last rescale > scaling-interval.max, then a restart and
parallelism increase will be done even if it leads to a parallelism
increase < 2. The rest regarding TM removal does not change.

=> So, the real difference with the current behavior is ** if the slots
addition was too little ** : in the current behavior nothing happens. In
the new behavior nothing happens unless the addition arrives after
scaling-interval.max.


Best

Etienne


Best,
D.

On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot
wrote:


Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
vote thread.

Best


Re: [DISUCSS] Deprecate multiple APIs in 1.18

2023-07-05 Thread Xintong Song
@Chesnay,

shouldn't the decision to deprecate an API be part of the FLIP discussion?
>

Exactly. I agree that deprecation of an old API should be part of the FLIP
where the new API is introduced. And I appreciate that many APIs that are
listed to be removed in release 2.0 are already deprecated when its
replacement is ready. E.g., SinkV1 is deprecated when SinkV2 is ready
(FLIP-191), TableSource / TableSink is deprecated when the new table source
& sink is ready (FLIP-95).

In this thread, I only brought up APIs which possibly should have been
deprecated already, i.e., APIs which already (or won't) have replacements.
Ideally, they should be deprecated when the corresponding FLIP is
completed, but are somehow not deprecated yet.
- DataSet, which should be subsumed by DataStream & Table according to
FLIP-131
- SourceFunction / SinkFunction, which seem to be replaced by the new
Source (FLIP-27) and SinkV2 (FLIP-191)
- Queryable State, which seem to be replaced by the State Processor API
(FLIP-43)
- All scala APIs, the decision is made in FLIP-265, and this is more a
implementation issue that we are expected to but haven't really deprecate
all user-facing scala APIs

@Alex,
I may not have understood all the details, but based on what you described
I'd hesitate to block the deprecation / removal of SourceFunction on this.
- Typically, we should not block production code changes on testing
requirements.
- IIUC, the testing scenario you described is like blocking the source for
proceeding (emit data, finish, etc.) until a checkpoint is finished. I
believe there are other ways to achieve this. E.g., block the source on a
CompletableFuture, and completes the future when the completion of
checkpoint is detected, via REST API or scanning the checkpoint directory.

Best,

Xintong



On Wed, Jul 5, 2023 at 7:08 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> @Jing
> Just to clarify, when you say:
>
> > 3. remove SinkFunction.
> Which steps do you imply for the 1.18 release and for the 2.0 release?
>
> @Xintong
> A side note - with the new Source API we lose the ability to control
> checkpointing from the source since there is no lock anymore. This
> functionality
> is currently used in a variety of tests for the Sinks - the tests that rely
> on tight
> synchronization between specific elements passed from the source  to the
> sink before
> allowing a checkpoint to complete (see FiniteTestSource [1]). Since FLIP-27
> Sources rely
> on decoupling via the mailbox, without exposing the lock, it is not
> immediately clear
> if it is possible to achieve the same functionality without major
> extensions in the
> runtime for such testing purposes. My hope initially was that only the
> legacy Sinks
> relied on this - this would have made it possible to drop
> SourceFunction+SinkFunction
> together, but, in fact, it also already became part of the new SinkV2
> testing IT suits
> [2]. Moreover, I know of at least one major connector that also relies on
> it for
> verifying committed sink metadata for a specific set of records (Iceberg)
> [3]. In my
> estimation this currently presents a major blocker for the SourceFunction
> removal.
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
> [2]
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java#L132
> [3]
>
> https://github.com/apache/iceberg/blob/master/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L75C1-L85C2
>
> Best,
> Alex
>
> On Wed, 5 Jul 2023 at 10:47, Chesnay Schepler  wrote:
>
> > There's a whole bunch of metric APIs that would need to be deprecated.
> > That is of course if the metric FLIPs are being accepted.
> >
> > Which makes me wonder if we aren't doing things the wrong way around;
> > shouldn't the decision to deprecate an API be part of the FLIP
> discussion?
> >
> > On 05/07/2023 07:39, Xintong Song wrote:
> > > Thanks all for the discussion.
> > >
> > > It seems to me there's a consensus on marking the following as
> deprecated
> > > in 1.18:
> > > - DataSet API
> > > - SourceFunction
> > > - Queryable State
> > > - All Scala APIs
> > >
> > > More time is needed for deprecating SinkFunction.
> > >
> > > I'll leave this discussion open for a few more days. And if there's no
> > > objections, I'll create JIRA tickets accordingly.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Wed, Jul 5, 2023 at 1:34 PM Xintong Song 
> > wrote:
> > >
> > >> Thanks for the input, Jing. I'd also be +1 for option 1.
> > >>
> > >> Best,
> > >>
> > >> Xintong
> > >>
> > >>
> > >>
> > >> On Mon, Jul 3, 2023 at 7:20 PM Jing Ge 
> > wrote:
> > >>
> > >>> Hi Xingtong,
> > >>>
> > >>> Option 1, secure plan would be:
> > >>>
> > >>> 1. graduate kafka, File, JDBC connectors to @Public
> 

Re: [DISUCSS] Deprecate multiple APIs in 1.18

2023-07-05 Thread Alexander Fedulov
@Jing
Just to clarify, when you say:

> 3. remove SinkFunction.
Which steps do you imply for the 1.18 release and for the 2.0 release?

@Xintong
A side note - with the new Source API we lose the ability to control
checkpointing from the source since there is no lock anymore. This
functionality
is currently used in a variety of tests for the Sinks - the tests that rely
on tight
synchronization between specific elements passed from the source  to the
sink before
allowing a checkpoint to complete (see FiniteTestSource [1]). Since FLIP-27
Sources rely
on decoupling via the mailbox, without exposing the lock, it is not
immediately clear
if it is possible to achieve the same functionality without major
extensions in the
runtime for such testing purposes. My hope initially was that only the
legacy Sinks
relied on this - this would have made it possible to drop
SourceFunction+SinkFunction
together, but, in fact, it also already became part of the new SinkV2
testing IT suits
[2]. Moreover, I know of at least one major connector that also relies on
it for
verifying committed sink metadata for a specific set of records (Iceberg)
[3]. In my
estimation this currently presents a major blocker for the SourceFunction
removal.

[1]
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java#L132
[3]
https://github.com/apache/iceberg/blob/master/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L75C1-L85C2

Best,
Alex

On Wed, 5 Jul 2023 at 10:47, Chesnay Schepler  wrote:

> There's a whole bunch of metric APIs that would need to be deprecated.
> That is of course if the metric FLIPs are being accepted.
>
> Which makes me wonder if we aren't doing things the wrong way around;
> shouldn't the decision to deprecate an API be part of the FLIP discussion?
>
> On 05/07/2023 07:39, Xintong Song wrote:
> > Thanks all for the discussion.
> >
> > It seems to me there's a consensus on marking the following as deprecated
> > in 1.18:
> > - DataSet API
> > - SourceFunction
> > - Queryable State
> > - All Scala APIs
> >
> > More time is needed for deprecating SinkFunction.
> >
> > I'll leave this discussion open for a few more days. And if there's no
> > objections, I'll create JIRA tickets accordingly.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, Jul 5, 2023 at 1:34 PM Xintong Song 
> wrote:
> >
> >> Thanks for the input, Jing. I'd also be +1 for option 1.
> >>
> >> Best,
> >>
> >> Xintong
> >>
> >>
> >>
> >> On Mon, Jul 3, 2023 at 7:20 PM Jing Ge 
> wrote:
> >>
> >>> Hi Xingtong,
> >>>
> >>> Option 1, secure plan would be:
> >>>
> >>> 1. graduate kafka, File, JDBC connectors to @Public
> >>> 2. graduate SinkV2 to @Public
> >>> 3. remove SinkFunction.
> >>>
> >>> Option 2, risky plan but at a fast pace:
> >>>
> >>> 1. graduate SinkV2 to @Public and expecting more maintenance effort
> since
> >>> there are many known and unsolved issues.
> >>> 2. remove SinkFunction.
> >>> 3. It depends on the connectors' contributors whether connectors can
> >>> upgrade to Flink 2.0, since we moved forward with SinkV2 API without
> >>> taking
> >>> care of implementations in external connectors.
> >>>
> >>> I am ok with both of them and personally prefer option 1.
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>>
> >>> On Fri, Jun 30, 2023 at 3:41 AM Xintong Song 
> >>> wrote:
> >>>
>  I see. Thanks for the explanation. I may have not looked into this
> >>> deeply
>  enough, and would trust the decision from you and the community
> members
> >>> who
>  participated in the discussion & vote.
> 
>  Best,
> 
>  Xintong
> 
> 
> 
>  On Thu, Jun 29, 2023 at 10:28 PM Alexander Fedulov <
>  alexander.fedu...@gmail.com> wrote:
> 
> >> However, I'm not sure about 2.
> > I am not aware of a bylaw that states the specific requirements in
> >>> order
>  to
> > mark something as @Deprecated. My understanding from the discussion
> >>> and
>  the
> > vote was that the community recognizes the necessity to make it
> >>> explicit
> > that
> > the usage of the SourceFunction API is discouraged. This can actually
> > stimulate
> > authors of connectors that rely on this very specific and
> non-baseline
> > functionality to contribute extensions to the new Source API
> >>> themselves
>  in
> > order to
> > close the gap. ExternallyInducedSource, for instance, was driven by
>  Pravega
> > to
> > begin with, since it was only needed for their purposes [1]. We are
> >>> not
> > removing
> > anything - until 2.0 everything will continue to work and we can work
> >>> on
> > resolving the limitations until then, I personally don't see a big

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-05 Thread Dong Lin
Hi Jing,

Thanks for the comments! Please find below my comments, which are based on
the offline discussion with Xuannan.

On Wed, Jul 5, 2023 at 1:36 AM Jing Ge  wrote:

> Hi Xuannan, Hi Dong
>
> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
> questions:
>
> 1. Naming convention for boolean variables. It is recommended to follow
> JavaBean [1], i.e. objectReuseCompliant as the variable name with
> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>
>
Good point. We have updated the FLIP as suggested.


>
> 2.
>
>-
>
>*If pipeline.object-reuse is set to true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if getIsObjectReuseCompliant() returns true, records
>emitted by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied
>before being given to the next operator in the chain.*
>
>
> If I understand you correctly,  the hard coding objectReusedCompliant
> should have higher priority over the configuration, the checking logic
> should be:
>
>-
>
>*If getIsObjectReuseCompliant() returns true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if pipeline.object-reuse is set to true, records emitted
>by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied
>before being given to the next operator in the chain.*
>
>
> The results are the same but the checking logics are different, but there
> are some additional thoughts, which lead us to the next question.
>

>

> 3. Current design lets specific operators enable object reuse and ignore
> the global config. There could be another thought, on the contrary: if an
> operator has hard coded the objectReuseCompliant as false, i.e. disable
> object reuse on purpose, records should not be reused even if the global
> config pipeline.object-reused is set to true, which turns out that the
> objectReuseCompliant could be a triple value logic: ture: force object
> reusing; false: force deep-copying; unknown: depends on
> pipeline.object-reuse config.
>

With the current proposal, if pipeline.object-reused == true and
operatorA's objectReuseCompliant == false, Flink will not deep-copy
operatorA's output. I think you are suggesting changing the behavior such
that Flink should deep-copy the operatorA's output.

Could you explain what is the advantage of this approach compared to the
approach described in the FLIP?

My concern with this approach is that it can cause performance regression.
This is an operator's objectReuseCompliant will be false by default unless
it is explicitly overridden. For those jobs which are currently configured
with pipeline.object-reused = true, these jobs will likely start to have
lower performance (due to object deep-copy) after upgrading to the newer
Flink version.

Best,
Dong


>
> Best regards,
> Jing
>
>
> [1] https://en.wikipedia.org/wiki/JavaBeans
>
> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su  wrote:
>
>> Hi all,
>>
>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> add operator attribute to allow operator to specify support for
>> object-reuse [1].
>>
>> Currently, the default configuration for pipeline.object-reuse is set
>> to false to avoid data corruption, which can result in suboptimal
>> performance. We propose adding APIs that operators can utilize to
>> inform the Flink runtime whether it is safe to reuse the emitted
>> records. This enhancement would enable Flink to maximize its
>> performance using the default configuration.
>>
>> Please refer to the FLIP document for more details about the proposed
>> design and implementation. We welcome any feedback and opinions on
>> this proposal.
>>
>> Best regards,
>>
>> Dong and Xuannan
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>
>


Re: [DISCUSS] Flink and Externalized connectors leads to block and circular dependency problems

2023-07-05 Thread Chesnay Schepler
Has there ever been thoughts about changing flink-pythons connector 
setup to use the table api connectors underneath?


The wrapping of connectors is a bit of a maintenance nightmare and 
doesn't really work with external/custom connectors.


On 04/07/2023 13:35, Dian Fu wrote:

Thanks Ran Tao for proposing this discussion and Martijn for sharing
the thought.


  While flink-python now fails the CI, it shouldn't actually depend on the

externalized connectors. I'm not sure what PyFlink does with it, but if
belongs to the connector code,

For each DataStream connector, there is a corresponding Python wrapper
and also some test cases in PyFlink. In theory, we should move that
wrapper into each connector repository. In the past, we have not done
that when externalizing the connectors since it may introduce some
burden when releasing since it means that we have to publish each
connector to PyPI separately.

To resolve this problem, I guess we can move the connector support in
PyFlink into the external connector repository.

Regards,
Dian


On Mon, Jul 3, 2023 at 11:08 PM Ran Tao  wrote:

@Martijn
thanks for clear explanations.

If we follow the line you specified (Connectors shouldn't rely on
dependencies that may or may not be
available in Flink itself)
It seems that we should add a certain dependency if we need(such as
commons-io, commons-collection) in connector pom explicitly.
And bundle it in sql-connector uber jar.

Then there is only one thing left that we need to make flink-python test
not depend on the released flink-connector.
Maybe we should check it out and decouple it like you suggested.

Best Regards,
Ran Tao
https://github.com/chucheng92


Martijn Visser  于2023年7月3日周一 22:06写道:


Hi Ran Tao,

Thanks for opening this topic. I think there's a couple of things at hand:
1. Connectors shouldn't rely on dependencies that may or may not be
available in Flink itself, like we've seen with flink-shaded. That avoids a
tight coupling between Flink and connectors, which is exactly what we try
to avoid.
2. When following that line, that would also be applicable for things like
commons-collections and commons-io. If a connector wants to use them, it
should make sure that it bundles those artifacts itself.
3. While flink-python now fails the CI, it shouldn't actually depend on the
externalized connectors. I'm not sure what PyFlink does with it, but if
belongs to the connector code, that code should also be moved to the
individual connector repo. If it's just a generic test, we could consider
creating a generic test against released connector versions to determine
compatibility.

I'm curious about the opinions of others as well.

Best regards,

Martijn

On Mon, Jul 3, 2023 at 3:37 PM Ran Tao  wrote:


I have an issue here that needs to upgrade commons-collections[1] (this

is

an example), but PR ci fails because flink-python test cases depend on
flink-sql-connector-kafka, but kafka-sql-connector is a small jar, does

not

include this dependency, so flink ci cause exception[2]. Current my
solution is [3]. But even if this PR is done, the upgrade of flink still
requires kafka-connector released.

This issue leads to deeper problems. Although the connectors have been
externalized, many UTs of flink-python depend on these connectors, and a
basic agreement of externalized connectors is that other dependencies
cannot be introduced explicitly, which means the externalized connectors
use dependencies inherited from flink. In this way, when flink main
upgrades some dependencies, it is easy to fail when executing

flink-python

test cases,because flink no longer has this class, and the connector does
not contain it. It's circular problem.

Unless, the connector self-consistently includes all dependencies, which

is

uncontrollable.
(only a few connectors include all jars in shade phase)

In short, the current flink-python module's dependencies on the connector
leads to an incomplete process of externalization and decoupling, which
will lead to circular dependencies when flink upgrade or change some
dependencies.

I don't know if I made it clear. I hope to get everyone's opinions on

what

better solutions we should adopt for similar problems in the future.

[1] https://issues.apache.org/jira/browse/FLINK-30274
[2]



https://user-images.githubusercontent.com/11287509/250120404-d12b60f4-7ff3-457e-a2c4-8cd415bb5ca2.png




https://user-images.githubusercontent.com/11287509/250120522-6b096a4f-83f0-4287-b7ad-d46b9371de4c.png

[3] https://github.com/apache/flink-connector-kafka/pull/38

Best Regards,
Ran Tao
https://github.com/chucheng92





[jira] [Created] (FLINK-32544) PythonFunctionFactoryTest fails on Java 17

2023-07-05 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32544:


 Summary: PythonFunctionFactoryTest fails on Java 17
 Key: FLINK-32544
 URL: https://issues.apache.org/jira/browse/FLINK-32544
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Legacy Components / Flink on Tez
Affects Versions: 1.18.0
Reporter: Chesnay Schepler


https://dev.azure.com/chesnay/flink/_build/results?buildId=3676=logs=fba17979-6d2e-591d-72f1-97cf42797c11=727942b6-6137-54f7-1ef9-e66e706ea068

{code}
Jul 05 10:17:23 Exception in thread "main" 
java.lang.reflect.InaccessibleObjectException: Unable to make field private 
static java.util.IdentityHashMap java.lang.ApplicationShutdownHooks.hooks 
accessible: module java.base does not "opens java.lang" to unnamed module 
@1880a322
Jul 05 10:17:23 at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
Jul 05 10:17:23 at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
Jul 05 10:17:23 at 
java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
Jul 05 10:17:23 at 
java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
Jul 05 10:17:23 at 
org.apache.flink.client.python.PythonFunctionFactoryTest.closeStartedPythonProcess(PythonFunctionFactoryTest.java:115)
Jul 05 10:17:23 at 
org.apache.flink.client.python.PythonFunctionFactoryTest.cleanEnvironment(PythonFunctionFactoryTest.java:79)
Jul 05 10:17:23 at 
org.apache.flink.client.python.PythonFunctionFactoryTest.main(PythonFunctionFactoryTest.java:52)
{code}

Side-notes:
* maybe re-evaluate if the test could be run through maven now
* The shutdown hooks business is quite sketchy, and AFAICT would be unnecessary 
if the test were an ITCase



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-05 Thread Shammon FY
Congratulations!

Best,
Shammon FY

On Wed, Jul 5, 2023 at 2:38 PM Paul Lam  wrote:

> Congrats and cheers!
>
> Best,
> Paul Lam
>
> > 2023年7月4日 18:04,Benchao Li  写道:
> >
> > Congratulations!
> >
> > Feng Jin  于2023年7月4日周二 16:17写道:
> >
> >> Congratulations!
> >>
> >> Best,
> >> Feng Jin
> >>
> >>
> >>
> >> On Tue, Jul 4, 2023 at 4:13 PM Yuxin Tan 
> wrote:
> >>
> >>> Congratulations!
> >>>
> >>> Best,
> >>> Yuxin
> >>>
> >>>
> >>> Dunn Bangui  于2023年7月4日周二 16:04写道:
> >>>
>  Congratulations!
> 
>  Best,
>  Bangui Dunn
> 
>  Yangze Guo  于2023年7月4日周二 15:59写道:
> 
> > Congrats everyone!
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 4, 2023 at 3:53 PM Rui Fan <1996fan...@gmail.com> wrote:
> >>
> >> Congratulations!
> >>
> >> Best,
> >> Rui Fan
> >>
> >> On Tue, Jul 4, 2023 at 2:08 PM Zhu Zhu  wrote:
> >>
> >>> Congratulations everyone!
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Hang Ruan  于2023年7月4日周二 14:06写道:
> 
>  Congratulations!
> 
>  Best,
>  Hang
> 
>  Jingsong Li  于2023年7月4日周二 13:47写道:
> 
> > Congratulations!
> >
> > Thank you! All of the Flink community!
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jul 4, 2023 at 1:24 PM tison 
>  wrote:
> >>
> >> Congrats and with honor :D
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Mang Zhang  于2023年7月4日周二 11:08写道:
> >>
> >>> Congratulations!--
> >>>
> >>> Best regards,
> >>> Mang Zhang
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2023-07-04 01:53:46,"liu ron"  写道:
>  Congrats everyone
> 
>  Best,
>  Ron
> 
>  Jark Wu  于2023年7月3日周一 22:48写道:
> 
> > Congrats everyone!
> >
> > Best,
> > Jark
> >
> >> 2023年7月3日 22:37,Yuval Itzchakov 
> >>> 写道:
> >>
> >> Congrats team!
> >>
> >> On Mon, Jul 3, 2023, 17:28 Jing Ge via user <
> > u...@flink.apache.org
> > > wrote:
> >>> Congratulations!
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>>
> >>> On Mon, Jul 3, 2023 at 3:21 PM yuxia <
> > luoyu...@alumni.sjtu.edu.cn
> > > wrote:
>  Congratulations!
> 
>  Best regards,
>  Yuxia
> 
>  发件人: "Pushpa Ramakrishnan" <
> > pushpa.ramakrish...@icloud.com
> >  > pushpa.ramakrish...@icloud.com>>
>  收件人: "Xintong Song"  >>  > tonysong...@gmail.com>>
>  抄送: "dev"  >>> dev@flink.apache.org>>,
> > "User"   u...@flink.apache.org
> >>>
>  发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
>  主题: Re: [ANNOUNCE] Apache Flink has won the 2023
>  SIGMOD
> >>> Systems
> >>> Award
> 
>  Congratulations \uD83E\uDD73
> 
>  On 03-Jul-2023, at 3:30 PM, Xintong Song <
> >>> tonysong...@gmail.com
> > > wrote:
> 
>  
>  Dear Community,
> 
>  I'm pleased to share this good news with everyone.
> >>> As
> > some
> >>> of
> > you
> >>> may
> > have already heard, Apache Flink has won the 2023
> >> SIGMOD
> > Systems
> > Award
> >>> [1].
> 
>  "Apache Flink greatly expanded the use of stream
> > data-processing."
> >>> --
> > SIGMOD Awards Committee
> 
>  SIGMOD is one of the most influential data
> >>> management
> >>> research
> > conferences in the world. The Systems Award is awarded
> >>> to
>  an
> > individual
> >>> or
> > set of individuals to recognize the development of a
> > software or
> >>> hardware
> > system whose technical contributions have had
> >>> significant
> >>> impact on
> > the
> > theory or practice of large-scale data management
> >>> systems.
> >>> Winning
> > of
> >>> the
> > award indicates the high recognition of Flink's
> > technological
> >>> advancement
> > and industry influence from academia.
> 
>  As an open-source project, Flink wouldn't have
> >> come
> > this far
> > 

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

2023-07-05 Thread Shammon FY
Hi devs,

Thanks for all the feedback.

I have discussed with @QingSheng Ren off-line to confirm some questionable
points in the FLIP. Thanks for his valuable inputs and I have updated the
FLIP according to our discussion.

Looking forward to your feedback, thanks,

Best,
Shammon FY


On Wed, Jul 5, 2023 at 5:26 PM Shammon FY  wrote:

> Hi Jing,
>
> Thanks for your feedback.
>
> > 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
>
> > 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
>
> > 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
>
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
>
> Best,
> Shammon FY
>
>
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge 
> wrote:
>
>> Hi Shammon,
>>
>> Thanks for your proposal. After reading the FLIP, I'd like to ask
>> some questions to make sure we are on the same page. Thanks!
>>
>> 1. TableColumnLineageRelation#sinkColumn() should return
>> TableColumnLineageEntity instead of String, right?
>>
>> 2. Since LineageRelation already contains all information to build the
>> lineage between sources and sink, do we still need to set the
>> LineageEntity
>> in the source?
>>
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
>> Qingsheng mentioned. How about LineageVertex, LineageEdge, and
>> LineageEdges
>> which contains multiple LineageEdge? E.g. multiple sources join into one
>> sink, or, edges of columns from one or different tables, etc.
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY  wrote:
>>
>> > Hi yuxia and Yun,
>> >
>> > Thanks for your input.
>> >
>> > For yuxia:
>> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
>> including?
>> >
>> > At present, we only need to notify the listener when a job goes to
>> > termination, but I think it makes sense to add generic `oldStatus` and
>> > `newStatus` in the listener and users can update the job state in their
>> > service as needed.
>> >
>> > > 2: I'm really confused about the `config()` included in
>> `LineageEntity`,
>> > where is it from and what is it for ?
>> >
>> > The `config` in `LineageEntity` is used for users to get options for
>> source
>> > and sink connectors. As the examples in the FLIP, users can add
>> > server/group/topic information in the config for kafka and create
>> lineage
>> > entities for `DataStream` jobs, then the listeners can get this
>> information
>> > to identify the same connector in different jobs. Otherwise, the
>> `config`
>> > in `TableLineageEntity` will be the same as `getOptions` in
>> > `CatalogBaseTable`.
>> >
>> > > 3: Regardless whether `inputChangelogMode` in
>> `TableSinkLineageEntity` is
>> > needed or not, since `TableSinkLineageEntity` contains
>> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
>> > changelogmode?
>> >
>> > At present, we do not actually use the changelog mode. It can be
>> deleted,
>> > and I have updated FLIP.
>> >
>> > > Btw, since there're a lot interfaces proposed, I think it'll be
>> better to
>> > give an example about how to implement a listener in this FLIP to make
>> us
>> > know better about the interfaces.
>> >
>> > I have added the example in the FLIP and the related interfaces and
>> > examples are in branch [1].
>> >
>> > For Yun:
>> > > I have one more question on the lookup-join dim tables, it seems this
>> > FLIP does not touch them, and will them become part of the
>> > List sources() or adding another interface?
>> >
>> > You're right, currently lookup join dim tables were not considered in
>> the
>> > 'proposed changed' section of this FLIP. But the interface for lineage
>> is
>> > universal and we can give 

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

2023-07-05 Thread Shammon FY
Hi Jing,

Thanks for your feedback.

> 1. TableColumnLineageRelation#sinkColumn() should return
TableColumnLineageEntity instead of String, right?

The `sinkColumn()` will return `String` which is the column name in the
sink connector. I found the name of `TableColumnLineageEntity` may
cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
In my mind the `TableColumnLineageRelation` represents the lineage for each
sink column, each column may be computed from multiple sources and columns.
I use `TableColumnSourceLineageEntity` to manage each source and its
columns for the sink column, so `TableColumnLineageRelation` has a sink
column name and `TableColumnSourceLineageEntity` list.

> 2. Since LineageRelation already contains all information to build the
lineage between sources and sink, do we still need to set the LineageEntity
in the source?

The lineage interface of `DataStream` is very flexible. We have added
`setLineageEntity` to the source to limit and verify user behavior,
ensuring that users have not added non-existent sources as lineage.

> 3. About the "Entity" and "Relation" naming, I was confused too, like
Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
which contains multiple LineageEdge?

We referred to `Atlas` for the name of lineage, it uses `Entity` and
`Relation` to represent the lineage relationship and another metadata
service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
and `Relation` are nicer for lineage, what do you think of it?

Best,
Shammon FY


On Thu, Jun 29, 2023 at 4:21 AM Jing Ge  wrote:

> Hi Shammon,
>
> Thanks for your proposal. After reading the FLIP, I'd like to ask
> some questions to make sure we are on the same page. Thanks!
>
> 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge? E.g. multiple sources join into one
> sink, or, edges of columns from one or different tables, etc.
>
> Best regards,
> Jing
>
> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY  wrote:
>
> > Hi yuxia and Yun,
> >
> > Thanks for your input.
> >
> > For yuxia:
> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
> >
> > At present, we only need to notify the listener when a job goes to
> > termination, but I think it makes sense to add generic `oldStatus` and
> > `newStatus` in the listener and users can update the job state in their
> > service as needed.
> >
> > > 2: I'm really confused about the `config()` included in
> `LineageEntity`,
> > where is it from and what is it for ?
> >
> > The `config` in `LineageEntity` is used for users to get options for
> source
> > and sink connectors. As the examples in the FLIP, users can add
> > server/group/topic information in the config for kafka and create lineage
> > entities for `DataStream` jobs, then the listeners can get this
> information
> > to identify the same connector in different jobs. Otherwise, the `config`
> > in `TableLineageEntity` will be the same as `getOptions` in
> > `CatalogBaseTable`.
> >
> > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
> is
> > needed or not, since `TableSinkLineageEntity` contains
> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > changelogmode?
> >
> > At present, we do not actually use the changelog mode. It can be deleted,
> > and I have updated FLIP.
> >
> > > Btw, since there're a lot interfaces proposed, I think it'll be better
> to
> > give an example about how to implement a listener in this FLIP to make us
> > know better about the interfaces.
> >
> > I have added the example in the FLIP and the related interfaces and
> > examples are in branch [1].
> >
> > For Yun:
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP does not touch them, and will them become part of the
> > List sources() or adding another interface?
> >
> > You're right, currently lookup join dim tables were not considered in the
> > 'proposed changed' section of this FLIP. But the interface for lineage is
> > universal and we can give `TableLookupSourceLineageEntity` which
> implements
> > `TableSourceLineageEntity` in the future without modifying the public
> > interface.
> >
> > > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> > The lineage information in SQL jobs includes table lineage and column
> > lineage. Although SQL jobs currently do not support column lineage, we
> > would like to support this in the next 

[jira] [Created] (FLINK-32543) The actual behavior of restart-strategy.xxx.delay is inconsistent with the document

2023-07-05 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-32543:
--

 Summary: The actual behavior of  restart-strategy.xxx.delay is 
inconsistent with the document
 Key: FLINK-32543
 URL: https://issues.apache.org/jira/browse/FLINK-32543
 Project: Flink
  Issue Type: Improvement
Reporter: Weijie Guo
Assignee: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32542) 'connector'='starrocks' 如何手动指定starrocks 中__op 隐藏字段,用来实现物理删除

2023-07-05 Thread LaraJiang (Jira)
LaraJiang created FLINK-32542:
-

 Summary: 'connector'='starrocks' 如何手动指定starrocks 中__op 
隐藏字段,用来实现物理删除
 Key: FLINK-32542
 URL: https://issues.apache.org/jira/browse/FLINK-32542
 Project: Flink
  Issue Type: Improvement
Reporter: LaraJiang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISUCSS] Deprecate multiple APIs in 1.18

2023-07-05 Thread Chesnay Schepler

There's a whole bunch of metric APIs that would need to be deprecated.
That is of course if the metric FLIPs are being accepted.

Which makes me wonder if we aren't doing things the wrong way around; 
shouldn't the decision to deprecate an API be part of the FLIP discussion?


On 05/07/2023 07:39, Xintong Song wrote:

Thanks all for the discussion.

It seems to me there's a consensus on marking the following as deprecated
in 1.18:
- DataSet API
- SourceFunction
- Queryable State
- All Scala APIs

More time is needed for deprecating SinkFunction.

I'll leave this discussion open for a few more days. And if there's no
objections, I'll create JIRA tickets accordingly.

Best,

Xintong



On Wed, Jul 5, 2023 at 1:34 PM Xintong Song  wrote:


Thanks for the input, Jing. I'd also be +1 for option 1.

Best,

Xintong



On Mon, Jul 3, 2023 at 7:20 PM Jing Ge  wrote:


Hi Xingtong,

Option 1, secure plan would be:

1. graduate kafka, File, JDBC connectors to @Public
2. graduate SinkV2 to @Public
3. remove SinkFunction.

Option 2, risky plan but at a fast pace:

1. graduate SinkV2 to @Public and expecting more maintenance effort since
there are many known and unsolved issues.
2. remove SinkFunction.
3. It depends on the connectors' contributors whether connectors can
upgrade to Flink 2.0, since we moved forward with SinkV2 API without
taking
care of implementations in external connectors.

I am ok with both of them and personally prefer option 1.

Best regards,
Jing


On Fri, Jun 30, 2023 at 3:41 AM Xintong Song 
wrote:


I see. Thanks for the explanation. I may have not looked into this

deeply

enough, and would trust the decision from you and the community members

who

participated in the discussion & vote.

Best,

Xintong



On Thu, Jun 29, 2023 at 10:28 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:


However, I'm not sure about 2.

I am not aware of a bylaw that states the specific requirements in

order

to

mark something as @Deprecated. My understanding from the discussion

and

the

vote was that the community recognizes the necessity to make it

explicit

that
the usage of the SourceFunction API is discouraged. This can actually
stimulate
authors of connectors that rely on this very specific and non-baseline
functionality to contribute extensions to the new Source API

themselves

in

order to
close the gap. ExternallyInducedSource, for instance, was driven by

Pravega

to
begin with, since it was only needed for their purposes [1]. We are

not

removing
anything - until 2.0 everything will continue to work and we can work

on

resolving the limitations until then, I personally don't see a big

issue

here.


Do you think it is feasible to resolve them by the feature freeze

date

of

1.18?
No, these are rather complex additions that would probably require

FLIP(s).

[1]



https://flink.apache.org/2022/01/20/pravega-flink-connector-101/#checkpoint-integration

On Thu, 29 Jun 2023 at 14:25, Xintong Song 

wrote:

Thanks for the explanation, Alex.

Not blocking the deprecation on 1 & 3 makes sense to me. However,

I'm

not

sure about 2.

It sounds to me that, without FLINK-28051 & FLINK-28054, some of the
connectors cannot migrate to the new Source API, or at least further
investigation is needed to understand the situation. If this is the

case,

we probably should not deprecate the API until these issues are

resolved.

Do you think it is feasible to resolve them by the feature freeze

date

of

1.18?

Best,

Xintong



On Thu, Jun 29, 2023 at 8:02 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:


@Xintong
The original discussion [1] and vote [2] converged on the idea

that

it

is

better
to make it clear to the users that they should stop using

SourceFunction

since it
is going away. The longer we do not have this indication, the more

user

implementations will be based on it and the more pain will be

induced

when

we
finally drop it. Users now have an alternative API that they

should

use

and

which
is fully functional, from that perspective nothing blocks marking

it

@Deprecated.
As for the remaining work items - there are primarily three kinds:

1. Where Flink internally uses SourceFunction, without exposing

this

fact

to the
outside world:
- FLINK-28050 [3]
- FLINK-28229 [4]
- FLINK-28048 [5]

2. Very specific edge cases that might not be covered by the

Source

API

as

is:
- FLINK-28054 [6]
- FLINK-28051 [7]

3. Usability improvements - something that was easily doable with
SourceFunction,
but requires deep knowledge of the new, significantly more

complex,

Source API
to achieve:
- FLINK-28056 [8]

In my mind, none of those are blockers for proceeding with adding

the

@Deprecated
annotation:
(1) is a simple case of encapsulation, internals should not

concern

the

API

users
(2) is really only relevant for "exotic" use cases. Does not mean

we

should

not
consider those, but since it is irrelevant for 99.9% of the

users, 

[jira] [Created] (FLINK-32541) Fix the buffer leaking in buffer accumulators when a failover occurs

2023-07-05 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-32541:
-

 Summary: Fix the buffer leaking in buffer accumulators when a 
failover occurs
 Key: FLINK-32541
 URL: https://issues.apache.org/jira/browse/FLINK-32541
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.18.0
Reporter: Yuxin Tan
Assignee: Yuxin Tan


When a failover occurs, the buffers in the sort/hash accumulators should be 
released correctly to avoid buffers leaking. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-05 Thread Xuannan Su
Hi Jing Ge,

Thank you for your valuable comments!

1. I agree with your suggestion regarding following the JavaBean
convention. It would be beneficial to incorporate this convention into
our Code Style Guide [1]. By doing so, we can ensure consistency and
make it easier for developers to adhere to the standard.

2. Yes, you are correct that the results remain the same. From my
understanding, this can be considered an implementation detail.

3. By the current design, when objectReuseCompliant = false, it
actually signifies an unknown state, and the decision to use object
reuse depends on the global configuration, pipeline.object-reuse. And
objectReuseCompliant = false is the default value of a StreamOperator.
On the other hand, when objectReuseCompliant = true, it indicates that
object reuse can be employed. And I don't think we require a value
that specifically enforces deep copying since we will enable object
reuse for all operators when pipeline.object-reuse = true to maintain
the current behavior.

Once again, thank you for your input, and let me know if there's
anything else I can assist you with.

Best regards,
Xuannan

[1] https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/

On Wed, Jul 5, 2023 at 1:37 AM Jing Ge  wrote:
>
> Hi Xuannan, Hi Dong
>
> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
> questions:
>
> 1. Naming convention for boolean variables. It is recommended to follow
> JavaBean [1], i.e. objectReuseCompliant as the variable name with
> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>
>
> 2.
>
>-
>
>*If pipeline.object-reuse is set to true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if getIsObjectReuseCompliant() returns true, records emitted
>by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied before
>being given to the next operator in the chain.*
>
>
> If I understand you correctly,  the hard coding objectReusedCompliant
> should have higher priority over the configuration, the checking logic
> should be:
>
>-
>
>*If getIsObjectReuseCompliant() returns true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if pipeline.object-reuse is set to true, records emitted by
>this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied before
>being given to the next operator in the chain.*
>
>
> The results are the same but the checking logics are different, but there
> are some additional thoughts, which lead us to the next question.
>
> 3. Current design lets specific operators enable object reuse and ignore
> the global config. There could be another thought, on the contrary: if an
> operator has hard coded the objectReuseCompliant as false, i.e. disable
> object reuse on purpose, records should not be reused even if the global
> config pipeline.object-reused is set to true, which turns out that the
> objectReuseCompliant could be a triple value logic: ture: force object
> reusing; false: force deep-copying; unknown: depends on
> pipeline.object-reuse config.
>
>
> Best regards,
> Jing
>
>
> [1] https://en.wikipedia.org/wiki/JavaBeans
>
> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su  wrote:
>
> > Hi all,
> >
> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > add operator attribute to allow operator to specify support for
> > object-reuse [1].
> >
> > Currently, the default configuration for pipeline.object-reuse is set
> > to false to avoid data corruption, which can result in suboptimal
> > performance. We propose adding APIs that operators can utilize to
> > inform the Flink runtime whether it is safe to reuse the emitted
> > records. This enhancement would enable Flink to maximize its
> > performance using the default configuration.
> >
> > Please refer to the FLIP document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on
> > this proposal.
> >
> > Best regards,
> >
> > Dong and Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> >


[jira] [Created] (FLINK-32540) The issue of not distributing the last batch of data

2023-07-05 Thread Jira
原来你是小幸运001 created FLINK-32540:
--

 Summary: The issue of not distributing the last batch of data
 Key: FLINK-32540
 URL: https://issues.apache.org/jira/browse/FLINK-32540
 Project: Flink
  Issue Type: Bug
 Environment: The above code was executed in IntelliJ IDEA, Flink 
version 1.16, which also has this issue in 1.14. Other versions have not 
attempted it

 
Reporter: 原来你是小幸运001


I copied the source code of the flat map and wanted to implement my own flat 
map. One of the logic is to issue the last piece of data at the end of the 
Flink job, so I executed collector.collect in the close method, but the data 
was not issued and the operator below cannot receive it.
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

/**
 * @author LaiYongBIn
 * @date 2023/7/5 10:09
 * @Description Do SomeThing
 */
public class Test {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream stream0 =
env.addSource(new SourceFunction() {
@Override
public void run(SourceContext 
sourceContext) throws Exception {
sourceContext.collect("TEST");

System.out.println("cancel");
}

@Override
public void cancel() {
}
})
.setParallelism(1);

MyFlatMapFun flatMapFunc = new MyFlatMapFun();
TypeInformation outType = 
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), 
Utils.getCallLocationName(), true);

DataStream flatMap = stream0.transform("Flat Map", outType, new 
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);

flatMap.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String s, Collector collector) throws 
Exception {

System.out.println("Obtain upstream 
data is:" + s);
}
});

env.execute();
}
}


class MyStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator {


private transient TimestampedCollector collector;


public MyStreamOperator(FlatMapFunction userFunction) {
super(userFunction);
}

@Override
public void open() throws Exception {
collector = new TimestampedCollector<>(output);
}

@Override
public void close() throws Exception {
// Distribute data during close
collector.collect("close message");
}

@Override
public void processElement(StreamRecord streamRecord) throws 
Exception {
// do nothing
}
}


 class MyFlatMapFun implements FlatMapFunction {

@Override
public void flatMap(String s, Collector collector) throws Exception 
{
// do nothing
}
} {code}
Then I found out there was a finish method, and I tried to execute 'collector. 
collect' in the finish method, and the data was successfully distributed。
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Hang Ruan
Hi, Piotr & Dong.

Thanks for the discussion.

IMO, I do not think the provided counter proposal is a good idea. There are
some concerns from my side.

1. It is hard to find the error checkpoint.
If there are other errors causing the checkpoint failure, we have to check
every failed checkpoint to find it.

2. It is more confused for the users.
Some users only know the feature, but don't know how we implement it. The
failed checkpoint may make them think the job is unhealthy.

3. Users should be able to set the checkpoint interval for the new backlog
state.
I think it is better to provide a setting for users to change the
checkpoint interval at the new backlog state. The hard-code interval(5x /
10x) is not flexible enough.

Best,
Hang

Dong Lin  于2023年7月5日周三 07:33写道:

> Hi Piotr,
>
> Any suggestion on how we can practically move forward to address the target
> use-case?
>
> My understanding is that the current proposal does not have any
> correctness/performance issues. And it allows the extension to support all
> the extra use-case without having to throw away the proposed APIs.
>
> If you prefer to have a better solution with simpler APIs and yet same or
> better correctness/performance for the target use-case, could you please
> kindly explain its API design so that we can continue the discussion?
>
>
> Best,
> Dong
>
> On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Please see my comments inline.
> >
> > On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Dong,
> >>
> >> Starting from the end:
> >>
> >> > It seems that the only benefit of this approach is to avoid"
> >> > adding SplitEnumeratorContext#setIsProcessingBacklog."
> >>
> >> Yes, that's the major benefit of this counter-proposal.
> >>
> >> > In the target use-case, user still want to do checkpoint (though at a"
> >> > larger interval) when there is backlog. And HybridSource need to know
> >> the"
> >> > expected checkpoint interval during backlog in order to determine
> >> whether"
> >> > it should keep throwing CheckpointException. Thus, we still need to
> add"
> >> > execution.checkpointing.interval-during-backlog for user to specify
> >> this"
> >> > information."
> >> >
> >> > The downside of this approach is that it is hard to enforce the"
> >> > semantics specified by
> execution.checkpointing.interval-during-backlog.
> >> For"
> >> > example, suppose execution.checkpointing.interval =3 minute and"
> >> > execution.checkpointing.interval-during-backlog = 7 minutes. During
> the"
> >> > backlog phase, checkpoint coordinator will still trigger the
> checkpoint"
> >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> >> > checkpoint invocation, and the effective checkpoint interval will be
> 9"
> >> > minutes."
> >>
> >> Does it really matter what's the exact value of the longer interval? Can
> >> not we
> >> hard-code it to be 5x or 10x of the base checkpoint interval? If there
> is
> >> a
> >> notice
> >> able overhead from the base interval slowing down records processing
> rate,
> >> reducing this interval by a factor of 5x or 10x, would fix performance
> >> issue for
> >> vast majority of users. So a source could just skip 4 out of 5 or 9 out
> of
> >> 10
> >> checkpoints.
> >>
> >
> > Yes, I think the exact value of the longer interval matters.
> >
> > The main reason we need two intervals is for jobs which have two-phase
> > commit sink. The short interval typically represents the interval that a
> > user can accept for the two-phase commit sink to buffer data (since it
> can
> > only emit data when checkpoint is triggered). And the long interval
> > typically represents the maximum amount of duplicate work (in terms of
> > time) that a job need to re-do after failover.
> >
> > Since there is no intrinsic relationship between the data buffer interval
> > (related to processing latency) and the failover boundary, I don't think
> we
> > can hardcode it to be 5x or 10x of the base checkpoint interval.
> >
> >
> >> Alternatively we could introduce a config option like:
> >>
> >> execution.checkpointing.long-interval
> >>
> >> that might be re-used in the future, with more fancy algorithms, but I
> >> don't see
> >> much value in doing that.
> >
> >
> >> > Overall, I think the solution is a bit hacky. I think it is preferred
> >> to"
> >> > throw exception only when there is indeed error. If we don't need to
> >> check"
> >> > a checkpoint, it is preferred to not trigger the checkpoint in the
> >> first"
> >> > place. And I think adding
> SplitEnumeratorContext#setIsProcessingBacklog
> >> is"
> >> > probably not that much of a big deal."
> >>
> >> Yes it's hacky, but at least it doesn't require extending the Public API
> >> for a
> >> quite limited solution, that only targets one or two sources that are
> >> rarely used.
> >>
> >
> > I am not sure it is fair to say MySQL CDC source is "rarely used".
> > ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, 

Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-05 Thread Paul Lam
Congrats and cheers!

Best,
Paul Lam

> 2023年7月4日 18:04,Benchao Li  写道:
> 
> Congratulations!
> 
> Feng Jin  于2023年7月4日周二 16:17写道:
> 
>> Congratulations!
>> 
>> Best,
>> Feng Jin
>> 
>> 
>> 
>> On Tue, Jul 4, 2023 at 4:13 PM Yuxin Tan  wrote:
>> 
>>> Congratulations!
>>> 
>>> Best,
>>> Yuxin
>>> 
>>> 
>>> Dunn Bangui  于2023年7月4日周二 16:04写道:
>>> 
 Congratulations!
 
 Best,
 Bangui Dunn
 
 Yangze Guo  于2023年7月4日周二 15:59写道:
 
> Congrats everyone!
> 
> Best,
> Yangze Guo
> 
> On Tue, Jul 4, 2023 at 3:53 PM Rui Fan <1996fan...@gmail.com> wrote:
>> 
>> Congratulations!
>> 
>> Best,
>> Rui Fan
>> 
>> On Tue, Jul 4, 2023 at 2:08 PM Zhu Zhu  wrote:
>> 
>>> Congratulations everyone!
>>> 
>>> Thanks,
>>> Zhu
>>> 
>>> Hang Ruan  于2023年7月4日周二 14:06写道:
 
 Congratulations!
 
 Best,
 Hang
 
 Jingsong Li  于2023年7月4日周二 13:47写道:
 
> Congratulations!
> 
> Thank you! All of the Flink community!
> 
> Best,
> Jingsong
> 
> On Tue, Jul 4, 2023 at 1:24 PM tison 
 wrote:
>> 
>> Congrats and with honor :D
>> 
>> Best,
>> tison.
>> 
>> 
>> Mang Zhang  于2023年7月4日周二 11:08写道:
>> 
>>> Congratulations!--
>>> 
>>> Best regards,
>>> Mang Zhang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2023-07-04 01:53:46,"liu ron"  写道:
 Congrats everyone
 
 Best,
 Ron
 
 Jark Wu  于2023年7月3日周一 22:48写道:
 
> Congrats everyone!
> 
> Best,
> Jark
> 
>> 2023年7月3日 22:37,Yuval Itzchakov 
>>> 写道:
>> 
>> Congrats team!
>> 
>> On Mon, Jul 3, 2023, 17:28 Jing Ge via user <
> u...@flink.apache.org
> > wrote:
>>> Congratulations!
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> 
>>> On Mon, Jul 3, 2023 at 3:21 PM yuxia <
> luoyu...@alumni.sjtu.edu.cn
> > wrote:
 Congratulations!
 
 Best regards,
 Yuxia
 
 发件人: "Pushpa Ramakrishnan" <
> pushpa.ramakrish...@icloud.com
>  pushpa.ramakrish...@icloud.com>>
 收件人: "Xintong Song" >  tonysong...@gmail.com>>
 抄送: "dev" >> dev@flink.apache.org>>,
> "User" >>> u...@flink.apache.org
>>> 
 发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
 主题: Re: [ANNOUNCE] Apache Flink has won the 2023
 SIGMOD
>>> Systems
>>> Award
 
 Congratulations \uD83E\uDD73
 
 On 03-Jul-2023, at 3:30 PM, Xintong Song <
>>> tonysong...@gmail.com
> > wrote:
 
 
 Dear Community,
 
 I'm pleased to share this good news with everyone.
>>> As
> some
>>> of
> you
>>> may
> have already heard, Apache Flink has won the 2023
>> SIGMOD
> Systems
> Award
>>> [1].
 
 "Apache Flink greatly expanded the use of stream
> data-processing."
>>> --
> SIGMOD Awards Committee
 
 SIGMOD is one of the most influential data
>>> management
>>> research
> conferences in the world. The Systems Award is awarded
>>> to
 an
> individual
>>> or
> set of individuals to recognize the development of a
> software or
>>> hardware
> system whose technical contributions have had
>>> significant
>>> impact on
> the
> theory or practice of large-scale data management
>>> systems.
>>> Winning
> of
>>> the
> award indicates the high recognition of Flink's
> technological
>>> advancement
> and industry influence from academia.
 
 As an open-source project, Flink wouldn't have
>> come
> this far
> without
> the wide, active and supportive community behind it.
>>> Kudos
> to
>>> all
> of us
>>> who
> helped make this happen, including the over 1,400
> contributors
>>> and
> many
> others who contributed in ways beyond code.
 
 Best,
 Xintong (on behalf of the Flink PMC)