Re: [VOTE] FLIP-321: introduce an API deprecation process

2023-07-06 Thread John Roesler
+1 (non-binding)

Thanks for the FLIP!
-John

On Mon, Jul 3, 2023, at 22:21, Jingsong Li wrote:
> +1 binding
>
> On Tue, Jul 4, 2023 at 10:40 AM Zhu Zhu  wrote:
>>
>> +1 (binding)
>>
>> Thanks,
>> Zhu
>>
>> ConradJam  于2023年7月3日周一 22:39写道:
>> >
>> > +1 (no-binding)
>> >
>> > Matthias Pohl  于2023年7月3日周一 22:33写道:
>> >
>> > > Thanks, Becket
>> > >
>> > > +1 (binding)
>> > >
>> > > On Mon, Jul 3, 2023 at 10:44 AM Jing Ge 
>> > > wrote:
>> > >
>> > > > +1(binding)
>> > > >
>> > > > On Mon, Jul 3, 2023 at 10:19 AM Stefan Richter
>> > > >  wrote:
>> > > >
>> > > > > +1 (binding)
>> > > > >
>> > > > >
>> > > > > > On 3. Jul 2023, at 10:08, Martijn Visser 
>> > > > > wrote:
>> > > > > >
>> > > > > > +1 (binding)
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Mon, Jul 3, 2023 at 10:03 AM Xintong Song > > > > > > wrote:
>> > > > > >
>> > > > > >> +1 (binding)
>> > > > > >>
>> > > > > >> Best,
>> > > > > >>
>> > > > > >> Xintong
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> On Sat, Jul 1, 2023 at 11:26 PM Dong Lin 
>> > > wrote:
>> > > > > >>
>> > > > > >>> Thanks for the FLIP.
>> > > > > >>>
>> > > > > >>> +1 (binding)
>> > > > > >>>
>> > > > > >>> On Fri, Jun 30, 2023 at 5:39 PM Becket Qin 
>> > > > > wrote:
>> > > > > >>>
>> > > > >  Hi folks,
>> > > > > 
>> > > > >  I'd like to start the VOTE for FLIP-321[1] which proposes to
>> > > > introduce
>> > > > > >> an
>> > > > >  API deprecation process to Flink. The discussion thread for the
>> > > FLIP
>> > > > > >> can
>> > > > > >>> be
>> > > > >  found here[2].
>> > > > > 
>> > > > >  The vote will be open until at least July 4, following the
>> > > consensus
>> > > > > >>> voting
>> > > > >  process.
>> > > > > 
>> > > > >  Thanks,
>> > > > > 
>> > > > >  Jiangjie (Becket) Qin
>> > > > > 
>> > > > >  [1]
>> > > > > 
>> > > > > 
>> > > > > >>>
>> > > > > >>
>> > > > >
>> > > >
>> > > https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%253A%2BIntroduce%2Ban%2BAPI%2Bdeprecation%2Bprocess&source=gmail-imap&ust=168897655400&usg=AOvVaw24XYJrIcv_vYj1fJVQ7TNY
>> > > > >  [2]
>> > > > >
>> > > >
>> > > https://www.google.com/url?q=https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9&source=gmail-imap&ust=168897655400&usg=AOvVaw1yaMLBBkFfvbBhvyAbHYfX
>> > > > >
>> > > > >
>> > > >
>> > >


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-20 Thread John Roesler
ecated 2 minor releases before the major version bump.
>2. We strictly follow the process in this FLIP, and will quickly bump
>the major version from 2.x to 3.0 once the migration period for DataStream
>API is reached.
> 
> I'm personally in favor of option 1. As the migration-period rule is newly
> proposed, I think it's fair to make exceptions for cases where we already
> missed the best chance for planning the deprecation. Moreover, I do believe
> having a quick major version bump does confuse users. Yes, we can explain
> to users that bumping from 2.x to 3.0 does not cost anything other than the
> removal of an deprecated API. But having to explain this itself is an
> indicator that it might be confusing for users.
> 
> 
> Becket, on the other hand, prefers option 2. From my understanding, his
> major point is that a quick major version bump causes barely any actual
> lose on users, while in option 1 not providing the migration period or
> providing a shorter on is an actual lose on users. (@Becket, please correct
> me if I'm mistaken.)
> 
> 
> And we'd like to hear more feedback from the community.
> 
> 
> Best,
> 
> Xintong
> 
> 
> 
> On Tue, Jun 20, 2023 at 2:55 AM John Roesler  wrote:
> 
> > Hi Becket and Xintong,
> >
> > I hope you don't mind if I chime in a little.
> >
> > Once an API is marked Public, we're committing to support it until it's
> > deprecated, and once it's deprecated, to leave it in place for at least two
> > minor releases and then only remove it in the following major release.
> >
> > As a user, I would be dismayed to discover that the project maintainers
> > demoted a Public API to PublicEvolving or Experimental in order to violate
> > the spirit of the deprecation period for Public APIs. Likewise, I'd view a
> > rapid sequence of minor releases for the purpose of dropping deprecated
> > APIs in the next major release as another violation of the spirit of our
> > guarantees. I'm reminded of this xkcd: https://xkcd.com/1494/
> >
> > I'm glad we're gaming out these situations before we institute this FLIP,
> > and I hope we all converge on a clear understanding of what we're
> > guaranteeing. From where I'm sitting, the essence of this FLIP is
> > specifically to prevent us from doing whatever we want and instead to
> > commit to providing a suitable notice period to users who rely on Public
> > APIs.
> >
> > It honestly shouldn't matter what the maintenance overhead is (or, rather,
> > evaluating the maintenance overhead should be a consideration before we
> > promote an API to Public to begin with). My experience in other projects is
> > that regretting a feature so much you want to drop it asap is extremely
> > rare, and that living with the pain for a little while until the
> > deprecation period expires will probably make us better engineers.
> >
> > With that all said, one thing that is not a violation is to propose a new
> > major release in order to drop especially burdensome deprecated APIs that
> > have already enjoyed their full deprecation period. Those really bad "case
> > A" features will be very rare.
> >
> > Looking over the FLIP again, I see a reference to the release
> > documentation, but not to the build tooling support for the Deprecated
> > annotation, so it might help to share this scenario: The way this will all
> > come together in practice is that, by giving good stability guarantees, we
> > will encourage our users to bump minor versions when they are released,
> > which means that they will see compiler warnings immediately if any of the
> > APIs they've used are deprecated. In fact, I'd recommend for them to use
> > the `-Werror` javac flag to be sure that they don't miss those warnings.
> > Given our release cadence, this means that they will have about six months
> > to migrate off of any deprecated APIs without being forced to avoid
> > updates. And they really need to be able to adopt further minor updates in
> > the series, since they may contain further improvements that actually
> > facilitate migrating off of the deprecated APIs.
> >
> > The compiler warnings are a far more reliable mechanism to advertise
> > deprecation than release notes, since users' own builds will notify them
> > right away about only the deprecations that are relevant to them, but it
> > does rely on users feeling ok to bump up minor versions in a timely
> > fashion. If we consistently burn them by not observing the stability
> > guarantees

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-19 Thread John Roesler
 all the release notes of Guava, Apache
> > > Commons, Apache Avro, ProtoBuf, etc, when you upgrade a version of them?
> > > One would probably try bumping the dependency version and see if there is
> > > an exception and solve them case by case. And if it is a minor version
> > > bump, one probably does not expect an exception at all. Another example
> > is
> > > that in Flink we still have so many usage of deprecated methods all over
> > > the place, and I strongly doubt everyone knows when the source code of
> > > these methods are deprecated and when they should be removed.
> > >
> > > So, the most important principle of API is simple and intuitive. The
> > > versioning semantic is a simple and universally accepted API stability
> > > standard. If we ourselves as Flink developers are relying on this for our
> > > own dependencies. I don't think we can expect more from our users.
> > >
> > > Another possible alternative: whenever there's a deprecated Public API
> > that
> > > > reaches a major version bump before the migration period, and we also
> > > don't
> > > > want to carry it for all the next major release series, we may consider
> > > > releasing more minor releases for the previous major version after the
> > > > bump. E.g., an Public API is deprecated in 1.19, and then we bump to
> > 2.0,
> > > > we can release one more 1.20 after 2.0. That should provide users
> > another
> > > > choice rather than upgrading to 2.0, while satisfying the
> > 2-minor-release
> > > > migration period.
> > >
> > > This is another option, but I think it is very likely more expensive than
> > > simply bumping the major version. And I imagine there will be questions
> > > like "why this feature is in 1.20, but does not exist in 2.0"?
> > >
> > > I think my major point is, we should not carry APIs deprecated in a
> > > > previous major version along all the next major version series. I'd
> > like
> > > to
> > > > try giving users more commitments, i.e. the migration period, as long
> > as
> > > it
> > > > does not prevent us from making breaking changes. If it doesn't work,
> > I'd
> > > > be in favor of not providing the migration period, but fallback to only
> > > > guarantee the compatibility within the major version.
> > >
> > > The part I don't understand is if we are willing to have a migration
> > > period, and do a minor version bump to remove an API, what do we lose to
> > do
> > > a major version bump instead, so we don't break the common versioning
> > > semantic?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Mon, Jun 19, 2023 at 3:20 PM Xintong Song 
> > > wrote:
> > >
> > > > >
> > > > > As an end user who only uses Public APIs, if I don't change my code
> > at
> > > > > all, my expectation is the following:
> > > > > 1. Upgrading from 1.x to 2.x may have issues.
> > > > > 2. If I can upgrade from 1.x to 2.x without an issue, I am fine with
> > > all
> > > > > the 2.x versions.
> > > > > Actually I think there are some dependency version resolution
> > policies
> > > > out
> > > > > there which picks the highest minor version when the dependencies
> > pull
> > > in
> > > > > multiple minor versions of the same jar, which may be broken if we
> > > remove
> > > > > the API in minor releases.
> > > > >
> > > >
> > > > I see your point, that users would feel surprised if they find things
> > no
> > > > longer work when upgrading to another 2.x minor release. However, I'd
> > > like
> > > > to point out that PublicEvolving APIs would have the similar problem
> > > > anyway. So the question is, how do we catch users' attention and make
> > > sure
> > > > they are aware that the Public APIs in 1.x may no longer be Public in
> > > 2.0.
> > > > There're various ways to do that, e.g., release notes, warnings in
> > logs,
> > > > etc.
> > > >
> > > > Another possible alternative: whenever there's a deprecated Public API
> > > that
> > > > reaches a major version bump before the migration period, and we also
&g

Re: [VOTE] FLIP-308: Support Time Travel

2023-06-19 Thread John Roesler
Thanks for this FLIP, Feng!

I've been following along, and I'm +1 (non-binding).

Thanks,
-John

On 2023/06/19 15:50:48 Leonard Xu wrote:
> +1(binding)
> 
> Best,
> Leonard
> 
> > On Jun 19, 2023, at 8:53 PM, Jing Ge  wrote:
> > 
> > +1(binding)
> > 
> > On Mon, Jun 19, 2023 at 1:57 PM Benchao Li  wrote:
> > 
> >> +1 (binding)
> >> 
> >> Lincoln Lee  于2023年6月19日周一 19:40写道:
> >> 
> >>> +1 (binding)
> >>> 
> >>> Best,
> >>> Lincoln Lee
> >>> 
> >>> 
> >>> yuxia  于2023年6月19日周一 19:30写道:
> >>> 
>  +1 (binding)
>  Thanks Feng driving it.
>  
>  Best regards,
>  Yuxia
>  
>  - 原始邮件 -
>  发件人: "Feng Jin" 
>  收件人: "dev" 
>  发送时间: 星期一, 2023年 6 月 19日 下午 7:22:06
>  主题: [VOTE] FLIP-308: Support Time Travel
>  
>  Hi everyone
>  
>  Thanks for all the feedback about the FLIP-308: Support Time Travel[1].
>  [2] is the discussion thread.
>  
>  
>  I'd like to start a vote for it. The vote will be open for at least 72
>  hours(excluding weekends,until June 22, 12:00AM GMT) unless there is an
>  objection or an insufficient number of votes.
>  
>  
>  [1]
>  
>  
> >>> 
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel
>  [2]https://lists.apache.org/thread/rpozdlf7469jmc7q7vc0s08pjnmscz00
>  
>  
>  Best,
>  Feng
>  
> >>> 
> >> 
> >> 
> >> --
> >> 
> >> Best,
> >> Benchao Li
> >> 
> 
> 


Re: [VOTE] FLIP-287: Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-19 Thread John Roesler
Thanks for the FLIP, Joao!

I'm +1 (non-binding)

-John

On 2023/06/19 15:54:49 Leonard Xu wrote:
> +1(binding)
> 
> Best,
> Leonard
> 
> > On Jun 19, 2023, at 8:12 PM, Martijn Visser  
> > wrote:
> > 
> > +1 (binding)
> > 
> > On Mon, Jun 19, 2023 at 4:58 AM Yuepeng Pan  wrote:
> > 
> >> +1 (non-binding)
> >> 
> >> Thanks,
> >> Roc
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 在 2023-06-19 10:55:05,"Zhu Zhu"  写道:
> >>> +1 (binding)
> >>> 
> >>> Thanks,
> >>> Zhu
> >>> 
> >>> Tzu-Li (Gordon) Tai  于2023年6月17日周六 11:32写道:
>  
>  +1 (binding)
>  
>  On Fri, Jun 16, 2023, 09:53 Jing Ge  wrote:
>  
> > +1(binding)
> > 
> > Best Regards,
> > Jing
> > 
> > On Fri, Jun 16, 2023 at 10:10 AM Lijie Wang  >>> 
> > wrote:
> > 
> >> +1 (binding)
> >> 
> >> Thanks for driving it, Joao.
> >> 
> >> Best,
> >> Lijie
> >> 
> >> Joao Boto  于2023年6月16日周五 15:53写道:
> >> 
> >>> Hi all,
> >>> 
> >>> Thank you to everyone for the feedback on FLIP-287[1]. Based on
> >> the
> >>> discussion thread [2], we have come to a consensus on the design
> >> and
> > are
> >>> ready to take a vote to contribute this to Flink.
> >>> 
> >>> I'd like to start a vote for it. The vote will be open for at
> >> least 72
> >>> hours(excluding weekends, unless there is an objection or an
> > insufficient
> >>> number of votes.
> >>> 
> >>> [1]
> >>> 
> >> 
> > 
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> >>> [2]
> >> https://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
> >>> 
> >>> 
> >>> Best,
> >>> Joao Boto
> >>> 
> >> 
> > 
> >> 
> 
> 


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-18 Thread John Roesler
Hi Becket,

Thanks for the reply! I’d like to continue the conversation about compatibility 
outside of this FLIP thread, but for now, I can accept your decision. It’s 
certainly an improvement. 

Thanks again,
John

On Sun, Jun 18, 2023, at 21:42, Becket Qin wrote:
> Hi John,
>
> Completely agree with all you said.
>
> Can we consider only dropping deprecated APIs in major releases across the
>> board? I understand that Experimental and PublicEvolving APIs are by
>> definition less stable, but it seems like this should be reflected in the
>> required deprecation period alone. I.e. that we must keep them around for
>> at least zero or one minor release, not that we can drop them in a minor or
>> patch release.
>
> Personally speaking, I would love to do this, for exactly the reason you
> mentioned. However, I did not propose this due to the following reasons:
>
> 1. I am hesitating a little bit about changing the accepted FLIPs too soon.
> 2. More importantly, to avoid slowing down our development. At this point,
> Flink still lacks some design / routines to support good API evolvability /
> extensibility. Just like you said, it takes some time to be good at this.
> In this case, my concern is that only removing Experimental /
> PublicEvolving APIs in major version changes may result in too much
> overhead and dramatically slow down the development of Flink. So, I was
> thinking that we can start with the current status. Hopefully after we are
> more comfortable with the maintenance overhead of deprecated APIs, we can
> then have a stronger guarantee for Experimental / PublicEvolving APIs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sun, Jun 18, 2023 at 6:44 AM John Roesler  wrote:
>
>> Hi Becket,
>>
>> Thanks for this FLIP! Having a deprecation process is really important. I
>> understand some people’s concerns about the additional burden for project
>> maintainers, but my personal experience with Kafka has been that it’s very
>> liveable and that it’s well worth the benefit to users. In fact, users
>> being able to confidently upgrade is also a benefit to maintainers, as we
>> will get fewer questions from people stuck on very old versions.
>>
>> One question:
>> Can we consider only dropping deprecated APIs in major releases across the
>> board? I understand that Experimental and PublicEvolving APIs are by
>> definition less stable, but it seems like this should be reflected in the
>> required deprecation period alone. I.e. that we must keep them around for
>> at least zero or one minor release, not that we can drop them in a minor or
>> patch release.
>>
>> The advantage of forbidding the removal of any API in minor or patch
>> releases is that users will get a strong guarantee that they can bump the
>> minor or patch version and still be able to compile, or even just re-link
>> and know that they won’t face “MethodDef” exceptions at run time. This is a
>> binary guarantee: if we allow removing  even Experimental APIs outside of
>> major releases, users can no longer confidently upgrade.
>>
>> Aside from that, I’d share my 2 cents on a couple of points:
>> * I’d use the official Deprecated annotation instead of introducing our
>> own flavor (Retired, etc), since Deprecated is well integrated into build
>> tools and IDEs.
>> * I wouldn’t worry about a demotion process in this FLIP; it seems
>> orthogonal, and something that should probably be taken case-by-case
>> anyway.
>> * Aside from deprecation and removal, there have been some discussions
>> about how to evolve APIs and behavior in compatible ways. This is somewhat
>> of an art, and if folks haven’t wrestled with it before, it’ll take some
>> time to become good at it. I feel like this topic should also be orthogonal
>> to this FLIP, but FWIW, my suggestion would be to adopt a simple policy not
>> to break existing user programs, and leave the “how” up to implementers and
>> reviewers.
>>
>> Thanks again,
>> John
>>
>> On Sat, Jun 17, 2023, at 11:03, Jing Ge wrote:
>> > Hi All,
>> >
>> > The @Public -> @PublicEvolving proposed by Xintong is a great idea.
>> > Especially, after he suggest @PublicRetired, i.e. @PublicEvolving --(2
>> > minor release)--> @Public --> @deprecated --(1 major
>> > release)--> @PublicRetired. It will provide a lot of flexibility without
>> > breaking any rules we had. @Public APIs are allowed to change between
>> major
>> > releases. Changing annotations is acceptable and provides additional
>> > tolerance i.e. user-friendliness, since the APIs themself 

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-17 Thread John Roesler
Hi Becket,

Thanks for this FLIP! Having a deprecation process is really important. I 
understand some people’s concerns about the additional burden for project 
maintainers, but my personal experience with Kafka has been that it’s very 
liveable and that it’s well worth the benefit to users. In fact, users being 
able to confidently upgrade is also a benefit to maintainers, as we will get 
fewer questions from people stuck on very old versions.

One question:
Can we consider only dropping deprecated APIs in major releases across the 
board? I understand that Experimental and PublicEvolving APIs are by definition 
less stable, but it seems like this should be reflected in the required 
deprecation period alone. I.e. that we must keep them around for at least zero 
or one minor release, not that we can drop them in a minor or patch release. 

The advantage of forbidding the removal of any API in minor or patch releases 
is that users will get a strong guarantee that they can bump the minor or patch 
version and still be able to compile, or even just re-link and know that they 
won’t face “MethodDef” exceptions at run time. This is a binary guarantee: if 
we allow removing  even Experimental APIs outside of major releases, users can 
no longer confidently upgrade.

Aside from that, I’d share my 2 cents on a couple of points:
* I’d use the official Deprecated annotation instead of introducing our own 
flavor (Retired, etc), since Deprecated is well integrated into build tools and 
IDEs.
* I wouldn’t worry about a demotion process in this FLIP; it seems orthogonal, 
and something that should probably be taken case-by-case anyway. 
* Aside from deprecation and removal, there have been some discussions about 
how to evolve APIs and behavior in compatible ways. This is somewhat of an art, 
and if folks haven’t wrestled with it before, it’ll take some time to become 
good at it. I feel like this topic should also be orthogonal to this FLIP, but 
FWIW, my suggestion would be to adopt a simple policy not to break existing 
user programs, and leave the “how” up to implementers and reviewers. 

Thanks again,
John

On Sat, Jun 17, 2023, at 11:03, Jing Ge wrote:
> Hi All,
>
> The @Public -> @PublicEvolving proposed by Xintong is a great idea.
> Especially, after he suggest @PublicRetired, i.e. @PublicEvolving --(2
> minor release)--> @Public --> @deprecated --(1 major
> release)--> @PublicRetired. It will provide a lot of flexibility without
> breaking any rules we had. @Public APIs are allowed to change between major
> releases. Changing annotations is acceptable and provides additional
> tolerance i.e. user-friendliness, since the APIs themself are not changed.
>
> I had similar thoughts when I was facing those issues. I want to move one
> step further and suggest introducing one more annotation @Retired.
>
> Not like the @PublicRetired which is a compromise of downgrading @Public to
> @PublicEvolving. As I mentioned earlier in my reply, Java standard
> @deprecated should be used in the early stage of the deprecation process
> and doesn't really meet our requirement. Since Java does not allow us to
> extend annotation, I think it would be feasible to have the new @Retired to
> help us monitor and manage the deprecation process, house cleaning, etc.
>
> Some ideas could be(open for discussion):
>
> @Retired:
>
> 1. There must be a replacement with functionality compatibility before APIs
> can be marked as @Retired, i.e. DISCUSS and VOTE processes on the ML are
> mandatory (a FLIP is recommended).
> 2. APIs marked as @Retired will be removed after 1 minor release sharply
> (using ArchUnit to force it, needs further check whether it is possible).
> Devs who marked them as @Retired are responsible to remove them.
> 3. Both @Public -> @Retired and @PublicEvolving -> @Retired are
> recommended. @Experimental -> @Retired and @Internal -> @Retired could also
> be used if it can increase user-friendliness or dev-friendliness, but not
> mandatory.
> 4. Some variables will be defined in @Retired to support the deprecation
> process management. Further extension is possible, since the annotation is
> built by us.
>
>
> Best regards,
> Jing
>
> On Fri, Jun 16, 2023 at 10:31 AM Becket Qin  wrote:
>
>> Hi Xintong,
>>
>> Thanks for the explanation. Please see the replies inline below.
>>
>> I agree. And from my understanding, demoting a Public API is also a kind of
>> > such change, just like removing one, which can only happen with major
>> > version bumps. I'm not proposing to allow demoting Public APIs anytime,
>> but
>> > only in the case major version bumps happen before reaching the
>> > 2-minor-release migration period. Actually, demoting would be a weaker
>> > change compared to removing the API immediately upon major version bumps,
>> > in order to keep the commitment about the 2-minor-release migration
>> period.
>> > If the concern is that `@Public` -> `@PublicEvolving` sounds against
>> > conventions, we may introduce a n

Re: [DISCUSS] Planning Flink 2.0

2023-04-28 Thread John Roesler
Hi all,

Great discussion!

For what it's worth, my experience has been that Semantic Versioning is really 
the best way to think about major releases. It can occasionally be nice to have 
a big "marketing release" to celebrate a major improvement, but coupling the 
concept of big improvements to major version numbers makes it hard to maintain 
a reasonable cadence of breaking changes. I'd be +1 on the idea of doing more 
frequent major releases, and making them mostly about compatibility breaks vs. 
improvements.

At the end of the day, the nicest thing is usually to introduce new features 
and improvements incrementally and compatibly, while also deprecating 
unfortunate features as soon as we regret them. Once features are deprecated, 
it's nice to drop them within a year or two, which implies that you need a 
major release every year or two. That shouldn't frighten us, since major 
releases don't guarantee non-compatibility. I.e., we generally wouldn't break 
anything that hasn't already been deprecated for a year or two, as opposed to 
just dropping everything that's marked as deprecated.

Users would only get broken if they are using features that have been 
deprecated for years and many releases. Building this expectation creates a 
valuable incentive to migrate off of deprecated functionality and onto the 
replacements, which helps to validate the migration path while there is still a 
safe fallback. It's also a significant benefit to the project's ability to keep 
its API and internals clean, which ultimately benefits users and maintainers 
alike.

Independent of planning 2.0, I think it's a great idea to have a higher-level 
discussion of the roadmap. Doing so will help everyone pull in the same 
direction, and it'll especially help newer contributors think about how they 
can contribute in valuable ways.

Thanks!
-John

On Fri, Apr 28, 2023, at 06:59, Jing Ge wrote:
> Hi,
>
> As far as I am concerned, it would be great to build two top level
> categories for Flink 2.0 release.
>
> 1. Future - mainly new big features or architecture improvements to achieve
> visions like streamhouse. This makes the 2.0 release be the 2.0 instead of
> 1.x release, i.e. the real intention of 2.0 release with a significant
> upgrade.
> 2. History - clean up tech debts, take care of histories, or whatever you
> want to name it. The main goal of this category is to take the 2.0 release
> opportunity (since we have strong intention to do it mentioned above) to
> perform breaking changes, i.e. remove deprecated APIs and even modules,
> upgrade APIs without thinking more about backwards compatibilities, etc.
> This is kind of a buy-one-get-one benefit. In order to "get-one"(History),
> we should, first of all, "buy-one"(Future).
>
> Best regards,
> Jing
>
> On Fri, Apr 28, 2023 at 9:57 AM Xintong Song  wrote:
>
>> Thanks all for the positive feedback.
>>
>> So far, it seems to me that the differences of opinions are mainly focused
>> on whether we should include non-breaking features in the 2.0 release.
>>
>> There seems to be no objections to:
>> 1. Starting to plan for the 2.0 release, with a rough timeline towards mid
>> next year
>> 2. Becket, Jark, Martijn and Xintong as the release managers
>> 3. Initiating a project roadmap discussion
>>
>> I'll leave this discussion open for a bit longer. Also, next week is public
>> holidays in China (I don't know if it's also in other countries). After the
>> holidays and if there's no objections, we'll assemble the release
>> management team as discussed, and try to figure out a proper way for the
>> roadmap discussion next.
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Fri, Apr 28, 2023 at 3:43 PM Xintong Song 
>> wrote:
>>
>> > @Weike,
>> >
>> > Thanks for the suggestion. I think it makes sense to provide a longer
>> > supporting period for the last 1.x release.
>> >
>> > @David,
>> >
>> > I can see the benefit of focusing on breaking changes and API clean-ups
>> > only, so the community can be more focused and possibly deliver the 2.0
>> > release earlier. However, I can also understand that it might be
>> > disappointing for some people (admittedly, a little bit for me as well)
>> if
>> > the 2.0 release contains only clean-ups but no other significant
>> user-aware
>> > improvements. My personal opinion would be not to prevent people from
>> > trying to get new features into this release, but would not block the
>> > release of such features unless breaking changes are involved.
>> >
>> > @Martijn,
>> >
>> > Thanks for sharing your ideas. Glad to see that things you've listed have
>> > a lot in common with what we put in our list. I believe that's a good
>> > signal that we share similar opinions on what is good and important for
>> the
>> > project and the release.
>> >
>> > @Sai,
>> >
>> > Welcome to the community. And thanks for offering helps.
>> >
>> > At the moment, this discussion is only happening in this mailing list. We
>> > may consider setting up online meetings or

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-11 Thread John Roesler
Hi Jark,

I hope you don’t mind if I chime in.

You have a good point that the sequence of upserts will eventually converge to 
the correct value under the at-least-once delivery guarantee, but it can still 
be important to avoid passing on uncommitted results. Some thoughts, numbered 
for reference:

1. Most generally, if some result R is written to the sink topic, but then the 
job fails before a checkpoint, rolls back, and reprocesses, producing R’, then 
it is incorrect to call R an “upsert”. In fact, as far as the system is 
concerned, R never happened at all (because it was part of a rolled-back batch 
of processing).

2. Readers may reasonably wish to impose some meaning on the sequence of 
upserts itself, so including aborted results can lead to wrong semantics 
downstream. Eg: “how many times has ‘x’ been updated today”?

3. Note that processing may not be deterministic over failures, and, building 
on (2), readers may have an expectation that every record in the topic 
corresponds to a real value that was associated with that key at some point. 
Eg, if we start with x=1, checkpoint, then produce x=99, crash, restart and 
produce x=2. Under at-least-once, the history of x is[1,99,2], while 
exactly-once would give the correct history of [1,2]. If we set up an alert if 
the value of x is ever greater over 10, then at-least-once will erroneously 
alert us, while exactly-once does not. 

4. Sending results for failed processing can also cause operational problems: 
if you’re processing a high volume of data, and you get into a crash loop, you 
can create a flood of repeated results. I’ve seen this case cause real world 
pain for people, and it’s nice to have a way to avoid it.

I hope some of these examples show why a user might reasonably want to 
configure the connector with the exactly-once guarantee. 

Thanks!
-John

On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> Hi Alexander,
>
> Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in
> case of producer retries
> or failovers. But as I explained above, it can’t avoid intentionally
> duplicated records. Actually, I would
> like to call them "upsert records" instead of "duplicates", that's why the
> connector is named "upsert-kafka",
> to make Kafka work like a database that supports updating and deleting by
> key.
>
> For example, there is a SQL query:
>
> SELECT URL, COUNT(*) page_views
> FROM access_logs
> GROUP BY URL;
>
> This is a continuous query[1] that continuously emits a new  page_views> record once a new URL
> access entry is received. The same URLs in the log may be far away and be
> processed in different checkpoints.
>
> It's easy to make upsert-kafka to support exactly-once delivery guarantee,
> but as we discussed above,
> it's unnecessary to support it and we intend to expose as few
> configurations to users as possible.
>
>
> Best,
> Jark
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
>
>
>
> On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
>  wrote:
>
>> Hi Jark,
>>
>> To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
>> producers prevent duplicated records[1], at least in the cases when
>> upstream does not produce them intentionally and across checkpoints.
>>
>> Could you please elaborate or point me to the docs that explain the reason
>> for duplicated records upstream and across checkpoints? I am relatively new
>> to Flink and not aware of it. According to the kafka connector
>> documentation, it does support exactly once semantics by configuring '
>> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
>> can't make upsert-kafka configurable in the same way to support this
>> delivery guarantee.
>>
>> Thank you,
>> Alexander
>>
>> 1.
>>
>> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
>> 2.
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
>>
>>
>> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu  wrote:
>>
>> > Hi Alexander,
>> >
>> > I’m not sure I fully understand the reasons. I left my comments inline.
>> >
>> > > 1. There might be other non-Flink topic consumers that would rather not
>> > have duplicated records.
>> >
>> > Exactly once can’t avoid producing duplicated records. Because the
>> upstream
>> > produces duplicated records intentionally and across checkpoints. Exactly
>> > once
>> > can’t recognize duplicated records and drop duplications.  That means
>> > duplicated
>> > records are written into topics even if exactly-once mode is enabled.
>> >
>> >
>> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
>> > previous values.
>> >
>> > Sorry, I don’t understand how exactly once can prevent this rollback
>> > behavior.
>> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
>> > b5,
>> > then back a5 if jobs perform checkpoints after produci

Re: [Vote] FLIP-298: Unifying the Implementation of SlotManager

2023-03-09 Thread John Roesler
Thanks, Weihua!

I’m +1 (non-binding)

-John

On Thu, Mar 9, 2023, at 21:02, Shammon FY wrote:
> Thanks weihua, +1 (non-binding)
>
> Best,
> Shammon
>
> On Fri, Mar 10, 2023 at 10:32 AM Xintong Song  wrote:
>
>> +1 (binding)
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Thu, Mar 9, 2023 at 1:28 PM Weihua Hu  wrote:
>>
>> > Hi Everyone,
>> >
>> > I would like to start the vote on FLIP-298: Unifying the Implementation
>> > of SlotManager [1]. The FLIP was discussed in this thread [2].
>> >
>> > This FLIP aims to unify the implementation of SlotManager in
>> > order to reduce maintenance costs.
>> >
>> > The vote will last for at least 72 hours (03/14, 15:00 UTC+8)
>> > unless there is an objection or insufficient votes. Thank you all.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-298%3A+Unifying+the+Implementation+of+SlotManager
>> > [2]https://lists.apache.org/thread/ocssfxglpc8z7cto3k8p44mrjxwr67r9
>> >
>> > Best,
>> > Weihua
>> >
>>


Re: [DISCUSS] FLIP-298: Unifying the Implementation of SlotManager

2023-03-01 Thread John Roesler
g cutting out functionalities out of slot manager, I think Yangze
>> and I have tried our best to shape the FineGrainedSlotManager into
>> reasonable components. I personally don't have other ideas to further
>> disassemble the component, but I'm open to such suggestions. However, from
>> the stability perspective, I'd be in favor of not introducing significant
>> changes to the FineGrainedSlotManager while switching it to the default.
>> Because the current implementation has already been verified (or at least
>> partially verified because Alibaba does not cover all the Flink use cases),
>> and introducing more changes also means more chances of breaking things.
>>
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Wed, Mar 1, 2023 at 11:12 AM Shammon FY  wrote:
>>
>> > Hi
>> >
>> > Thanks for starting this work weihua, I think unifying
>> > DeclarativeSlotManager and FineGrainedSlotManager is valuable.
>> >
>> > I agree with @Matthias and @John that we need a way to ensure that
>> > DeclarativeSlotManager's capabilities are fully covered by
>> > FineGrainedSlotManager
>> >
>> > 1. For their functional differences, can you give some detailed tests to
>> > verify that the new FineGrainedSlotManager has these capabilities? This
>> can
>> > effectively verify the new functions
>> >
>> > 2. I'm worried that many functions are not independent and it is
>> difficult
>> > to migrate step-by-step. You can list the relationship between them in
>> > detail.
>> >
>> > 3. As John mentioned, give a smoke test for FineGrainedSlotManager is a
>> > good idea. Or you can add some test information to the
>> > DeclarativeSlotManager to determine how many tests have used it. In this
>> > way, we can gradually construct test cases for FineGrainedSlotManager
>> > during the development process.
>> >
>> >
>> > Best,
>> > Shammon
>> >
>> >
>> > On Tue, Feb 28, 2023 at 10:22 PM John Roesler 
>> wrote:
>> >
>> > > Thanks for the FLIP, Weihua!
>> > >
>> > > I’ve read the FLIP, and it sounds good to me. We need to avoid
>> > > proliferating alternative implementations wherever possible. I have
>> just
>> > a
>> > > couple of comments:
>> > >
>> > > 1. I share Matthias’s concern about ensuring the behavior is really the
>> > > same. One suggestion I’ve used for this kind of thing is, as a smoke
>> > test,
>> > > to update the DeclarativeSlotManager to just delegate to the
>> > > FineGrainedSlotManager. If the full test suite still passes, you can be
>> > > pretty sure the new default is really ok. It would not be a good idea
>> to
>> > > actually keep that in for the release, since it would remove the option
>> > to
>> > > fall back in case of bugs. Either way, we need to make sure all test
>> > > scenarios are present for the FGSM.
>> > >
>> > > 4. In addition to changing the default, would it make sense to log a
>> > > deprecation warning on initialization if the DeclarativeSlotManager is
>> > used?
>> > >
>> > > Thanks again,
>> > > John
>> > >
>> > > On Tue, Feb 28, 2023, at 07:20, Matthias Pohl wrote:
>> > > > Hi Weihua,
>> > > > Thanks for your proposal. From a conceptual point: AFAIU, the
>> > > > DeclarativeSlotManager covers a subset (i.e. only evenly sized slots)
>> > of
>> > > > what the FineGrainedSlotManager should be able to achieve (variable
>> > slot
>> > > > size per task manager). Is this the right assumption/understanding?
>> In
>> > > this
>> > > > sense, merging both implementations into a single one sounds good. A
>> > few
>> > > > more general comments, though:
>> > > >
>> > > > 1. Did you do a proper test coverage analysis? That's not mentioned
>> in
>> > > the
>> > > > current version of the FLIP. I'm bringing this up because we ran into
>> > the
>> > > > same issue when fixing the flaws that popped up after introducing the
>> > > > multi-component leader election (see FLIP-285 [1]). There is a risk
>> > that
>> > > by
>> > > > removing the legacy code we decrease test coverage because certain
>> > >

Re: [DISCUSS] FLIP-298: Unifying the Implementation of SlotManager

2023-02-28 Thread John Roesler
Thanks for the FLIP, Weihua!

I’ve read the FLIP, and it sounds good to me. We need to avoid proliferating 
alternative implementations wherever possible. I have just a couple of comments:

1. I share Matthias’s concern about ensuring the behavior is really the same. 
One suggestion I’ve used for this kind of thing is, as a smoke test, to update 
the DeclarativeSlotManager to just delegate to the FineGrainedSlotManager. If 
the full test suite still passes, you can be pretty sure the new default is 
really ok. It would not be a good idea to actually keep that in for the 
release, since it would remove the option to fall back in case of bugs. Either 
way, we need to make sure all test scenarios are present for the FGSM. 

4. In addition to changing the default, would it make sense to log a 
deprecation warning on initialization if the DeclarativeSlotManager is used?

Thanks again,
John

On Tue, Feb 28, 2023, at 07:20, Matthias Pohl wrote:
> Hi Weihua,
> Thanks for your proposal. From a conceptual point: AFAIU, the
> DeclarativeSlotManager covers a subset (i.e. only evenly sized slots) of
> what the FineGrainedSlotManager should be able to achieve (variable slot
> size per task manager). Is this the right assumption/understanding? In this
> sense, merging both implementations into a single one sounds good. A few
> more general comments, though:
>
> 1. Did you do a proper test coverage analysis? That's not mentioned in the
> current version of the FLIP. I'm bringing this up because we ran into the
> same issue when fixing the flaws that popped up after introducing the
> multi-component leader election (see FLIP-285 [1]). There is a risk that by
> removing the legacy code we decrease test coverage because certain
> test cases that were covered for the legacy classes might not be
> necessarily covered in the new implementation, yet (see FLINK-30338 [2]
> which covers this issue for the leader election case). Ideally, we don't
> want to remove test cases accidentally because they were only implemented
> for the DeclarativeSlotManager but missed for the FineGrainedSlotManager.
>
> 2. DeclarativeSlotManager and FineGrainedSlotManager feel quite big in
> terms of lines of code. Without knowing whether it's actually a reasonable
> thing to do: Instead of just adding more features to the
> FineGrainedSlotManager, have you thought of cutting out functionality into
> smaller sub-components along this refactoring? Such a step-by-step approach
> might improve the overall codebase and might make reviewing the refactoring
> easier. I did a first pass over the code and struggled to identify code
> blocks that could be moved out of the SlotManager implementation(s).
> Therefore, I might be wrong with this proposal. I haven't worked on this
> codebase in that detail that it would allow me to come up with a judgement
> call. I wanted to bring it up, anyway, because I'm curious whether that
> could be an option. There's a comment created by Chesnay (CC'd) in the
> JavaDoc of TaskExecutorManager [3] indicating something similar. I'm
> wondering whether he can add some insights here.
>
> 3. For me personally, having a more detailed summary comparing the
> subcomponents of both SlotManager implementations with where
> their functionality matches and where they differ might help understand the
> consequences of the changes proposed in FLIP-298.
>
> Best,
> Matthias
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box
> [2] https://issues.apache.org/jira/browse/FLINK-30338
> [3]
> https://github.com/apache/flink/blob/f611ea8cb5deddb42429df2c99f0c68d7382e9bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java#L66-L68
>
> On Tue, Feb 28, 2023 at 6:14 AM Matt Wang  wrote:
>
>> This is a good proposal for me, it will make the code of the SlotManager
>> more clear.
>>
>>
>>
>> --
>>
>> Best,
>> Matt Wang
>>
>>
>>  Replied Message 
>> | From | David Morávek |
>> | Date | 02/27/2023 22:45 |
>> | To |  |
>> | Subject | Re: [DISCUSS] FLIP-298: Unifying the Implementation of
>> SlotManager |
>> Hi Weihua, I still need to dig into the details, but the overall sentiment
>> of this change sounds reasonable.
>>
>> Best,
>> D.
>>
>> On Mon, Feb 27, 2023 at 2:26 PM Zhanghao Chen 
>> wrote:
>>
>> Thanks for driving this topic. I think this FLIP could help clean up the
>> codebase to make it easier to maintain. +1 on it.
>>
>> Best,
>> Zhanghao Chen
>> 
>> From: Weihua Hu 
>> Sent: Monday, February 27, 2023 20:40
>> To: dev 
>> Subject: [DISCUSS] FLIP-298: Unifying the Implementation of SlotManager
>>
>> Hi everyone,
>>
>> I would like to begin a discussion on FLIP-298: Unifying the Implementation
>> of SlotManager[1]. There are currently two types of SlotManager in Flink:
>> DeclarativeSlotManager and FineGrainedSlotManager. FineGrainedSlotMana

Re: [VOTE] FLIP-291: Externalized Declarative Resource Management

2023-02-28 Thread John Roesler
Thanks for the FLIP, David!

I’m +1 (non-binding)

-John

On Tue, Feb 28, 2023, at 07:46, David Morávek wrote:
> Hi Everyone,
>
> I want to start the vote on FLIP-291: Externalized Declarative Resource
> Management [1]. The FLIP was discussed in this thread [2].
>
> The goal of the FLIP is to enable external declaration of the resource
> requirements of a running job.
>
> The vote will last for at least 72 hours (Friday, 3rd of March, 15:00 CET)
> unless
> there is an objection or insufficient votes.
>
> [1] https://lists.apache.org/thread/b8fnj127jsl5ljg6p4w3c4wvq30cnybh
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
>
> Best,
> D.


Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-28 Thread John Roesler
Thanks for the answer, David!

It sounds like there is a race condition, but it’s a known issue not specific 
to this FLIP, and the failure case isn’t too bad. I’m satisfied with that.

Thanks,
John

On Thu, Feb 23, 2023, at 10:39, David Morávek wrote:
> Hi Everyone,
>
> @John
>
> This is a problem that we've spent some time trying to crack; in the end,
> we've decided to go against doing any upgrades to JobGraphStore from
> JobMaster to avoid having multiple writers that are guarded by different
> leader election lock (Dispatcher and JobMaster might live in a different
> process). The contract we've decided to choose instead is leveraging the
> idempotency of the endpoint and having the user of the API retry in case
> we're unable to persist new requirements in the JobGraphStore [1]. We
> eventually need to move JobGraphStore out of the dispatcher, but that's way
> out of the scope of this FLIP. The solution is a deliberate trade-off. The
> worst scenario is that the Dispatcher fails over in between retries, which
> would simply rescale the job to meet the previous resource requirements
> (more extended unavailability of underlying HA storage would have worse
> consequences than this). Does that answer your question?
>
> @Matthias
>
> Good catch! I'm fixing it now, thanks!
>
> [1]
> https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cR1151
>
> Best,
> D.
>
> On Tue, Feb 21, 2023 at 12:24 AM John Roesler  wrote:
>
>> Thanks for the FLIP, David!
>>
>> I just had one small question. IIUC, the REST API PUT request will go
>> through the new DispatcherGateway method to be handled. Then, after
>> validation, the dispatcher would call the new JobMasterGateway method to
>> actually update the job.
>>
>> Which component will write the updated JobGraph? I just wanted to make
>> sure it’s the JobMaster because it it were the dispatcher, there could be a
>> race condition with the async JobMaster method.
>>
>> Thanks!
>> -John
>>
>> On Mon, Feb 20, 2023, at 07:34, Matthias Pohl wrote:
>> > Thanks for your clarifications, David. I don't have any additional major
>> > points to add. One thing about the FLIP: The RPC layer API for updating
>> the
>> > JRR returns a future with a JRR? I don't see value in returning a JRR
>> here
>> > since it's an idempotent operation? Wouldn't it be enough to return
>> > CompletableFuture here? Or am I missing something?
>> >
>> > Matthias
>> >
>> > On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels 
>> wrote:
>> >
>> >> Thanks David! If we could get the pre-allocation working as part of
>> >> the FLIP, that would be great.
>> >>
>> >> Concerning the downscale case, I agree this is a special case for the
>> >> (single-job) application mode where we could re-allocate slots in a
>> >> way that could leave entire task managers unoccupied which we would
>> >> then be able to release. The goal essentially is to reduce slot
>> >> fragmentation on scale down by packing the slots efficiently. The
>> >> easiest way to add this optimization when running in application mode
>> >> would be to drop as many task managers during the restart such that
>> >> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look into
>> >> this independently of the FLIP.
>> >>
>> >> Feel free to start the vote.
>> >>
>> >> -Max
>> >>
>> >> On Mon, Feb 20, 2023 at 9:10 AM David Morávek  wrote:
>> >> >
>> >> > Hi everyone,
>> >> >
>> >> > Thanks for the feedback! I've updated the FLIP to use idempotent PUT
>> API
>> >> instead of PATCH and to properly handle lower bound settings, to support
>> >> the "pre-allocation" of the resources.
>> >> >
>> >> > @Max
>> >> >
>> >> > > How hard would it be to address this issue in the FLIP?
>> >> >
>> >> > I've included this in the FLIP. It might not be too hard to implement
>> >> this in the end.
>> >> >
>> >> > > B) drop as many superfluous task managers as needed
>> >> >
>> >> > I've intentionally left this part out for now because this ultimately
>> >> needs to be the responsibility of the Resource Manager. After all, in
>> the
&

Re: [DISCUSS] FLIP-297: Improve Auxiliary Sql Statements

2023-02-24 Thread John Roesler
Hello Ran,

Thanks for the FLIP!

Do you mind if we revisit the topic of doing this by adding an Information 
schema? The SHOW approach requires modifying the parser/language for every gap 
we identify. On the flip side, an Information schema is much easier to discover 
and remember how to use, and the ability to run queries on it is quite valuable 
for admins. It’s also better for Flink maintainers, because the information 
tables’ schemas can be evolved over time just like regular tables, whereas 
every change to a SHOW statement would be a breaking change. 

I understand that it may seem like a big effort, but we’re proposing quite a 
big extension to the space of SHOW statement, so it seems appropriate to take 
the opportunity and migrate to a better framework rather than incrementally 
building on (and tying us even more firmly to) the existing approach.

Thanks for your consideration,
John

On Fri, Feb 24, 2023, at 05:58, Sergey Nuyanzin wrote:
> thanks for explanation
>
>>But it's not clear to me what exactly
>> you want to display? Is it the name of the plugin?
>
> I was thinking about name, type (source/sink) and may be version (not sure
> if it's possible right now)
>
> On Fri, Feb 24, 2023 at 12:46 PM Ran Tao  wrote:
>
>> Hi, Sergey. thanks. first step we can support filtering for show operations
>> in this FLIP try to align other engines.
>> If we want to support describe other objects, of course we need to design
>> how to get these metadatas or informations and printAsStyle. (So it maybe
>> need another FLIP for more details).
>>
>> > Does it make sense to add support for connectors e.g. show
>> > {sink|source|all} connectors?
>> I think we can support it, currently flink do support some operations only
>> for flink itself such as showJobs. But it's not clear to me what exactly
>> you want to display? Is it the name of the plugin?
>> Just Like:
>> Kafka
>> Hudi
>> Files
>>
>> Best Regards,
>> Ran Tao
>>
>>
>> Sergey Nuyanzin  于2023年2月24日周五 19:11写道:
>>
>> > Thanks for driving the FLIP
>> >
>> > I have a couple of questions
>> > Am I right that INFORMATION_SCHEMA mentioned by Timo[1]  is out of scope
>> of
>> > this FLIP?
>> > I noticed there are some support of it in Spark[2]/Hive[3]/Snowflake[4]
>> and
>> > others
>> >
>> > Does it make sense to add support for connectors e.g. show
>> > {sink|source|all} connectors?
>> >
>> > [1] https://lists.apache.org/thread/2g108qlfwbhb56wnoc5qj0yq29dvq1vv
>> > [2] https://issues.apache.org/jira/browse/SPARK-16452
>> > [3] https://issues.apache.org/jira/browse/HIVE-1010
>> > [4] https://docs.snowflake.com/en/sql-reference/info-schema
>> >
>> >
>> > On Fri, Feb 24, 2023 at 4:19 AM Jark Wu  wrote:
>> >
>> > > Hi Jing,
>> > >
>> > > > we'd better reduce the dependency chain and follow the Law of
>> > > Demeter(LoD, clean code).
>> > > > Adding a new method in TableEnvironment is therefore better than
>> > calling
>> > > an API chain
>> > >
>> > > I think I don't fully agree that LoD is a good practice. Actually, I
>> > would
>> > > prefer to keep the API clean and concise.
>> > > This is also the Java Collection Framework design principle [1]: "High
>> > > power-to-weight ratio". Otherwise,
>> > > it will explode the API interfaces with different combinations of
>> > methods.
>> > > Currently, TableEnvironment
>> > > already provides 60+ methods.
>> > >
>> > > IMO, with the increasing methods of accessing and manipulating
>> metadata,
>> > > they can be extracted to
>> > > a separate interface, where we can add richer methods. This work can be
>> > > aligned with the
>> > > CatalogManager interface (FLIP-295) [2].
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > [1]:
>> > >
>> > >
>> >
>> https://stackoverflow.com/questions/7568819/why-no-tail-or-head-method-in-list-to-get-last-or-first-element
>> > > [2]: https://lists.apache.org/thread/9bnjblgd9wvrl75lkm84oo654c4lqv70
>> > >
>> > >
>> > > On Fri, 24 Feb 2023 at 10:38, Aitozi  wrote:
>> > >
>> > > > Hi,
>> > > > Thanks for the nice proposal, Ran.
>> > > > Regarding this api usage, I have some discussion with @twalthr
>> > before
>> > > > as here <
>> > > > https://github.com/apache/flink/pull/15137#issuecomment-1356124138>
>> > > > Personally, I think leaking the Catalog to the user side is not
>> > suitable,
>> > > > since there are some read/write operations in the Catalog, the
>> > > > TableEnvironment#getCatalog will expose all of them to the user side.
>> > So
>> > > I
>> > > > learned to add a new api in TableEnvironment to reduce reliance on
>> the
>> > > > current TableEnvironment#getCatalog.
>> > > >
>> > > > Thanks,
>> > > > Aitozi
>> > > >
>> > > >
>> > > > Ran Tao  于2023年2月23日周四 23:44写道:
>> > > >
>> > > > > Hi, JingSong, Jing.
>> > > > >
>> > > > > thank for sharing your opinions.
>> > > > >
>> > > > > What you say makes sense, both approaches have pros and cons.
>> > > > >
>> > > > > If it is a modification of `TableEnvrionment`, such as
>> > > > > listDatabases(catalog). It is actually co

Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-20 Thread John Roesler
Thanks for the FLIP, David!

I just had one small question. IIUC, the REST API PUT request will go through 
the new DispatcherGateway method to be handled. Then, after validation, the 
dispatcher would call the new JobMasterGateway method to actually update the 
job. 

Which component will write the updated JobGraph? I just wanted to make sure 
it’s the JobMaster because it it were the dispatcher, there could be a race 
condition with the async JobMaster method. 

Thanks!
-John 

On Mon, Feb 20, 2023, at 07:34, Matthias Pohl wrote:
> Thanks for your clarifications, David. I don't have any additional major
> points to add. One thing about the FLIP: The RPC layer API for updating the
> JRR returns a future with a JRR? I don't see value in returning a JRR here
> since it's an idempotent operation? Wouldn't it be enough to return
> CompletableFuture here? Or am I missing something?
>
> Matthias
>
> On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels  wrote:
>
>> Thanks David! If we could get the pre-allocation working as part of
>> the FLIP, that would be great.
>>
>> Concerning the downscale case, I agree this is a special case for the
>> (single-job) application mode where we could re-allocate slots in a
>> way that could leave entire task managers unoccupied which we would
>> then be able to release. The goal essentially is to reduce slot
>> fragmentation on scale down by packing the slots efficiently. The
>> easiest way to add this optimization when running in application mode
>> would be to drop as many task managers during the restart such that
>> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look into
>> this independently of the FLIP.
>>
>> Feel free to start the vote.
>>
>> -Max
>>
>> On Mon, Feb 20, 2023 at 9:10 AM David Morávek  wrote:
>> >
>> > Hi everyone,
>> >
>> > Thanks for the feedback! I've updated the FLIP to use idempotent PUT API
>> instead of PATCH and to properly handle lower bound settings, to support
>> the "pre-allocation" of the resources.
>> >
>> > @Max
>> >
>> > > How hard would it be to address this issue in the FLIP?
>> >
>> > I've included this in the FLIP. It might not be too hard to implement
>> this in the end.
>> >
>> > > B) drop as many superfluous task managers as needed
>> >
>> > I've intentionally left this part out for now because this ultimately
>> needs to be the responsibility of the Resource Manager. After all, in the
>> Session Cluster scenario, the Scheduler doesn't have the bigger picture of
>> other tasks of other jobs running on those TMs. This will most likely be a
>> topic for another FLIP.
>> >
>> > WDYT? If there are no other questions or concerns, I'd like to start the
>> vote on Wednesday.
>> >
>> > Best,
>> > D.
>> >
>> > On Wed, Feb 15, 2023 at 3:34 PM Maximilian Michels 
>> wrote:
>> >>
>> >> I missed that the FLIP states:
>> >>
>> >> > Currently, even though we’d expose the lower bound for clarity and
>> API completeness, we won’t allow setting it to any other value than one
>> until we have full support throughout the stack.
>> >>
>> >> How hard would it be to address this issue in the FLIP?
>> >>
>> >> There is not much value to offer setting a lower bound which won't be
>> >> respected / throw an error when it is set. If we had support for a
>> >> lower bound, we could enforce a resource contract externally via
>> >> setting lowerBound == upperBound. That ties back to the Rescale API
>> >> discussion we had. I want to better understand what the major concerns
>> >> would be around allowing this.
>> >>
>> >> Just to outline how I imagine the logic to work:
>> >>
>> >> A) The resource constraints are already met => Nothing changes
>> >> B) More resources available than required => Cancel the job, drop as
>> >> many superfluous task managers as needed, restart the job
>> >> C) Less resources available than required => Acquire new task
>> >> managers, wait for them to register, cancel and restart the job
>> >>
>> >> I'm open to helping out with the implementation.
>> >>
>> >> -Max
>> >>
>> >> On Mon, Feb 13, 2023 at 7:45 PM Maximilian Michels 
>> wrote:
>> >> >
>> >> > Based on further discussion I had with Chesnay on this PR [1], I think
>> >> > jobs would currently go into a restarting state after the resource
>> >> > requirements have changed. This wouldn't achieve what we had in mind,
>> >> > i.e. sticking to the old resource requirements until enough slots are
>> >> > available to fulfil the new resource requirements. So this may not be
>> >> > 100% what we need but it could be extended to do what we want.
>> >> >
>> >> > -Max
>> >> >
>> >> > [1] https://github.com/apache/flink/pull/21908#discussion_r1104792362
>> >> >
>> >> > On Mon, Feb 13, 2023 at 7:16 PM Maximilian Michels 
>> wrote:
>> >> > >
>> >> > > Hi David,
>> >> > >
>> >> > > This is awesome! Great writeup and demo. This is pretty much what we
>> >> > > need for the autoscaler as part of the Flink Kubernetes operator
>> [1].
>> >> > > Scaling Flink jobs effectively is hard

Re: [ANNOUNCE] New Apache Flink Committer - Anton Kalashnikov

2023-02-20 Thread John Roesler
Congratulations, Anton!
-John

On Mon, Feb 20, 2023, at 08:18, Piotr Nowojski wrote:
> Hi, everyone
>
> On behalf of the PMC, I'm very happy to announce Anton Kalashnikov as a new
> Flink Committer.
>
> Anton has been very active for almost two years already, authored and
> reviewed many PRs over this time. He is active in the Flink's runtime,
> being the main author of improvements like Buffer Debloating (FLIP-183)
> [1], solved many bugs and fixed many test instabilities, generally speaking
> helping with the maintenance of runtime components.
>
> Please join me in congratulating Anton Kalashnikov for becoming a Flink
> Committer!
>
> Best,
> Piotr Nowojski (on behalf of the Flink PMC)
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-01-27 Thread John Roesler
Hello Shammon and all,

Thanks for this FLIP! I've been working toward this kind of global consistency 
across large scale data infrastructure for a long time, and it's fantastic to 
see a high-profile effort like this come into play.

I have been lurking in the discussion for a while and delaying my response 
while I collected my thoughts. However, I've realized at some point, delaying 
more is not as useful as just asking a few questions, so I'm sorry if some of 
this seems beside the point. I'll number these to not collide with prior 
discussion points:

10. Have you considered proposing a general consistency mechanism instead of 
restricting it to TableStore+ETL graphs? For example, it seems to me to be 
possible and valuable to define instead the contract that sources/sinks need to 
implement in order to participate in globally consistent snapshots.

11. It seems like this design is assuming that the "ETL Topology" under the 
envelope of the consistency model is a well-ordered set of jobs, but I suspect 
this is not the case for many organizations. It may be aspirational, but I 
think the gold-standard here would be to provide an entire organization with a 
consistency model spanning a loosely coupled ecosystem of jobs and data flows 
spanning teams and systems that are organizationally far apart.

I realize that may be kind of abstract. Here's some examples of what's on my 
mind here: 

11a. Engineering may operate one Flink cluster, and some other org, like 
Finance may operate another. In most cases, those are separate domains that 
don't typically get mixed together in jobs, but some people, like the CEO, 
would still benefit from being able to make a consistent query that spans 
arbitrary contexts within the business. How well can a feature like this 
transcend a single Flink infrastructure? Does it make sense to consider a model 
in which snapshots from different domains can be composable?

11b. Some groups may have a relatively stable set of long-running jobs, while 
others (like data science, skunkworks, etc) may adopt a more experimental, 
iterative approach with lots of jobs entering and exiting the ecosystem over 
time. It's still valuable to have them participate in the consistency model, 
but it seems like the consistency system will have to deal with more chaos than 
I see in the design. For example, how can this feature tolerate things like 
zombie jobs (which are registered in the system, but fail to check in for a 
long time, and then come back later).

12. I didn't see any statements about patterns like cycles in the ETL Topology. 
I'm aware that there are fundamental constraints on how well cyclic topologies 
can be supported by a distributed snapshot algorithm. However, there are a 
range of approaches/compromises that we can apply to cyclic topologies. At the 
very least, we can state that we will detect cycles and produce a warning, etc.

13. I'm not sure how heavily you're waiting the query syntax part of the 
proposal, so please feel free to defer this point. It looked to me like the 
proposal assumes people want to query either the latest consistent snapshot or 
the latest inconsistent state. However, it seems like there's a significant 
opportunity to maintain a manifest of historical snapshots and allow people to 
query as of old points in time. That can be valuable for individuals answering 
data questions, building products, and crucially supporting auditability use 
cases. To that latter point, it seems nice to provide not only a mechanism to 
query arbitrary snapshots, but also to define a TTL/GC model that allows users 
to keep hourly snapshots for N hours, daily snapshots for N days, weekly 
snapshots for N weeks, and the same for monthly, quarterly, and yearly 
snapshots.

Ok, that's all I have for now :) I'd also like to understand some lower-level 
details, but I wanted to get these high-level questions off my chest.

Thanks again for the FLIP!
-John

On 2023/01/13 11:43:28 Shammon FY wrote:
> Hi Piotr,
> 
> I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned
> Checkpoint` for data consistency in FLIP, we think there are many defects
> indeed in using `Aligned Checkpoint` to support data consistency as you
> mentioned.
> 
> According to our historical discussion, I think we have reached an
> agreement on an important point: we finally need `Timestamp Barrier
> Mechanism` to support data consistency. But according to our (@jinsong lee
> and I) opinions, the total design and implementation based on 'Timestamp
> Barrier' will be too complex, and it's also too big in one FLIP.
> 
> So we‘d like to use FLIP-276[1] as an overview design of data consistency
> in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong and
> I hope that we can reach an agreement on the overall design in FLINK-276
> first, and then on the basic of FLIP-276 we can create other FLIPs with
> detailed design according to modules and drive them. Finally, we can
> supp

Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread John Roesler
Thanks for this proposal, Qingsheng!

If you want to be a little conservative with the default, 5 minutes might be 
better than 30 seconds.

The equivalent config in Kafka seems to be metadata.max.age.ms 
(https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms), 
which has a default value of 5 minutes.

Other than that, I’m in favor. I agree, this should be on by default.

Thanks again,
John

On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> Thanks Qingsheng for driving this, enable the dynamic partition 
> discovery would be very useful for kafka topic scale partitions 
> scenarios.
>
> +1 for the change.
>
> CC: Becket 
>
>
> Best,
> Leonard 
>
>
>
>> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
>> 
>> +1 for the change. I think this is beneficial for users and is compatible.
>> 
>> Best,
>> Jark
>> 
>> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
>> 
 
 +1 for this idea, we have enabled kafka dynamic partition discovery in
>>> all
 jobs.
 
 
>>>


Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-07 Thread John Roesler
Thanks, for the discussion, Dong. 

To answer your question: I was unclear if the desire was to deprecate the 
metric itself, to be replaced with some other metric, or whether we just wanted 
to deprecate the way the superclass manages it. It’s clear now that we want to 
keep the metric and only change the superclass logic.

I think we’re on the same page now. 

Thanks!
-John

On Sat, Jan 7, 2023, at 07:21, Dong Lin wrote:
> Hi John,
>
> Not sure if I understand the difference between "deprecate that metric" and
> "deprecate the private counter mechanism". I think what we want is to
> update SourceReaderBase's implementation so that it no longer explicitly
> increments this metric. But we still need to expose this metric to user.
> And methods such as RecordEmitter#emitRecord (which can be invoked by
> SourceReaderBase#pollNext(...)) can potentially increment this metric.
>
> I like your approach of adding an extra SourceReaderBase constructor. That
> appears simpler than adding a deprecated config. We can mark the existing
> SourceReaderBase constructor as @deprecated. SourceReaderBase#pollNext(...)
> will not increment the counter if a subclass uses the newly added
> constructor.
>
> Cheers,
> Dong
>
>
> On Sat, Jan 7, 2023 at 4:47 AM John Roesler  wrote:
>
>> Thanks for the replies, Dong and Wencong!
>>
>> That’s a good point about the overhead of the extra method.
>>
>> Is the desire to actually deprecate that metric in a user-facing way, or
>> just to deprecate the private counter mechanism?
>>
>> It seems like if the desire is to deprecate the existing private counter,
>> we can accomplish it by deprecating the current constructor and offering
>> another that is documented not to track the metric. This seems better than
>> the config option, since the concern is purely about the division of
>> responsibilities between the sub- and super-class.
>>
>> Another option, which might be better if we wish to keep a uniformly named
>> metric, would be to simply make the counter protected. That would be better
>> if we’re generally happy with the metric and counter, but a few special
>> source connectors need to emit records in other ways.
>>
>> And finally, if we really want to get rid of the metric itself, then I
>> agree, a config is the way to do it.
>>
>> Thanks,
>> John
>>
>> On Fri, Jan 6, 2023, at 00:55, Dong Lin wrote:
>> > Hi John and Wencong,
>> >
>> > Thanks for the reply!
>> >
>> > It is nice that optional-2 can address the problem without affecting the
>> > existing source connectors as far as functionality is concerned. One
>> > potential concern with this approach is that it might increase the Flink
>> > runtime overhead by adding one more virtual functional call to the
>> > per-record runtime call stack.
>> >
>> > Since Java's default MaxInlineLevel is 12-18, I believe it is easy for an
>> > operator chain of 5+ operators to exceed this limit. In this case. And
>> > option-2 would incur one more virtual table lookup to produce each
>> record.
>> > It is not clear how much this overhead would show up for jobs with a
>> chain
>> > of lightweight operators. I am recently working on FLINK-30531
>> > <https://issues.apache.org/jira/browse/FLINK-30531> to reduce runtime
>> > overhead which might be related to this discussion.
>> >
>> > In comparison to option-2, the option-3 provided in my earlier email
>> would
>> > not add this extra overhead. I think it might be worthwhile to invest in
>> > the long-term performance (and simpler runtime infra) and pay for the
>> > short-term cost of deprecating this metric in SourceOperatorBase. What do
>> > you think?
>> >
>> > Regards,
>> > Dong
>> >
>> >
>> > On Thu, Jan 5, 2023 at 10:10 PM Wencong Liu 
>> wrote:
>> >
>> >> Hi, All
>> >>
>> >>
>> >> Thanks for the reply!
>> >>
>> >>
>> >> I think both John and Dong's opinions are reasonable. John's Suggestion
>> 2
>> >> is a good implementation.
>> >> It does not affect the existing source connectors, but also provides
>> >> support
>> >> for custom counter in the future implementation.
>> >>
>> >>
>> >> WDYT?
>> >>
>> >>
>> >> Best,
>> >>
>> >> Wencong Liu
>>


Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-06 Thread John Roesler
Thanks for the replies, Dong and Wencong!

That’s a good point about the overhead of the extra method.

Is the desire to actually deprecate that metric in a user-facing way, or just 
to deprecate the private counter mechanism?

It seems like if the desire is to deprecate the existing private counter, we 
can accomplish it by deprecating the current constructor and offering another 
that is documented not to track the metric. This seems better than the config 
option, since the concern is purely about the division of responsibilities 
between the sub- and super-class. 

Another option, which might be better if we wish to keep a uniformly named 
metric, would be to simply make the counter protected. That would be better if 
we’re generally happy with the metric and counter, but a few special source 
connectors need to emit records in other ways. 

And finally, if we really want to get rid of the metric itself, then I agree, a 
config is the way to do it. 

Thanks,
John

On Fri, Jan 6, 2023, at 00:55, Dong Lin wrote:
> Hi John and Wencong,
>
> Thanks for the reply!
>
> It is nice that optional-2 can address the problem without affecting the
> existing source connectors as far as functionality is concerned. One
> potential concern with this approach is that it might increase the Flink
> runtime overhead by adding one more virtual functional call to the
> per-record runtime call stack.
>
> Since Java's default MaxInlineLevel is 12-18, I believe it is easy for an
> operator chain of 5+ operators to exceed this limit. In this case. And
> option-2 would incur one more virtual table lookup to produce each record.
> It is not clear how much this overhead would show up for jobs with a chain
> of lightweight operators. I am recently working on FLINK-30531
>  to reduce runtime
> overhead which might be related to this discussion.
>
> In comparison to option-2, the option-3 provided in my earlier email would
> not add this extra overhead. I think it might be worthwhile to invest in
> the long-term performance (and simpler runtime infra) and pay for the
> short-term cost of deprecating this metric in SourceOperatorBase. What do
> you think?
>
> Regards,
> Dong
>
>
> On Thu, Jan 5, 2023 at 10:10 PM Wencong Liu  wrote:
>
>> Hi, All
>>
>>
>> Thanks for the reply!
>>
>>
>> I think both John and Dong's opinions are reasonable. John's Suggestion 2
>> is a good implementation.
>> It does not affect the existing source connectors, but also provides
>> support
>> for custom counter in the future implementation.
>>
>>
>> WDYT?
>>
>>
>> Best,
>>
>> Wencong Liu


Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-04 Thread John Roesler
Hi Wencong,

Thanks for the proposal! I agree that we should fix this disconnect between the 
metric and what is actually happening in those source connectors.

My instincts agree with Dong's. Adding a configuration option in order to tune 
the relationship between a superclass and a subclass seems not quite right.

The best way to customize superclass behavior is to override a method, and the 
second best approach is to add a constructor with a new parameter.

Override Suggestion 1:
It looks like if the source connector overrides `pollNext`, you can prevent the 
undesired `numRecordsInCounter.inc(1)` call. If you simply want to disable the 
way the base class emits (and counts) records and do it another way,  this 
might be your best bet.

If you still want to invoke the counter on your own, you could propose to make 
it `protected` as well.

Override Suggestion 2:
If you want to ensure that the metric is always incremented when a record is 
emitted, you can propose to add a new protected method like 
`SourceReaderBase#emitRecord`, which increments the counter and then calls the 
recordEmitter. `pollNext` would be updated to call this method, and any other 
subclass logic could also call the method.

Constructor Suggestion:
This option isn't as good, but you could add a new constructor that takes a 
boolean flag to disable the default counting logic, similar to the config you 
propose. The advantage is that no one besides the base class and subclasses 
have to be aware of this flag, and the behavior is configured at compile time 
instead of runtime. The disadvantage is that the base class will need new 
conditional checks and branches, which increases its complexity for all source 
connectors, not just the ones that want to change the default behavior.

Looking at all these options, I would personally prefer Override Suggestion 2. 
I hope this helps!

Thanks,
-John

On 2023/01/04 12:58:13 Dong Lin wrote:
> Hi Wencong,
> 
> Thanks for kicking off the discussion! I think it would be nice to address
> this problem.
> 
> Is the config supposed to be publicly visible only to source connector
> developers but not to end users? It might be a bit unusual to have a
> subclass use a config to disable a public feature in the superclass.
> 
> One alternative solution might be to deprecate/remove numRecordsInCounter
> from SourceReaderBase entirely and ask developers to maintain the counter
> as appropriate. For KafkaSource and FileSource, the counter can be
> incremented in KafkaSourceRecordEmitter#emitRecord and
> FileSourceRecordEmitter#emitRecord respectively. We would still need to add
> a config in the short term to allow a deprecation period. And the config
> itself will be marked as @deprecated.
> 
> The downside of this alternative solution is that we need to deprecate the
> counting feature in SourceReaderBase. The upside is that we won't need to
> add a public config and the implementation might be easier to maintain in
> the long term.
> 
> What do you think?
> 
> Thanks,
> Dong
> 
> 
> On Tue, Jan 3, 2023 at 11:50 AM Wencong Liu  wrote:
> 
> > Hi devs,
> >
> >
> > I'd like to start a discussion about FLINK-30234: SourceReaderBase should
> > provide an option to disable numRecordsIn metric registration [1].
> >
> >
> > As the FLINK-302345 describes, the numRecordsIn metric is pre-registered
> > for all sources in SourceReaderBase currently. Considering different
> > implementation of source reader, the definition of "record" might differ
> > from the one we use in SourceReaderBase, hence numRecordsIn might be
> > inaccurate.
> >
> >
> > We could introduce an public option in SourceReaderOptions used in
> > SourceReaderBase:
> >
> >
> > source.reader.metric.num_records_in.override: false
> >
> >
> > By default, the source reader will use the numRecordsIn metric in
> > SourceReaderBase. If source reader want to report to metric by self, it can
> > set source.reader.metric.num_records_in.override to true, which disables
> > the registration of numRecordsIn in SourceReaderBase and let the actual
> > implementation to report the metric instead.
> >
> >
> > Any thoughts on this?
> >
> >
> > [1]
> > https://issues.apache.org/jira/browse/FLINK-30234?jql=project%20%3D%20FLINK
> >
> >
> > Best, Wencong Liu
> 


Re: [DISCUSS] FLIP-278: Hybrid Source Connector

2022-12-18 Thread John Roesler
Hello all,

Thanks for the FLIP, Ran!

The HybridSource is a really cool feature, and I was glad to see a proposal to 
expose it in the Table and SQL APIs.

My main question is also about the switching control (question 2). It seems 
like the existing Kafka connector has all the options you'd want to define the 
switching point[1], and the issue is only how to specify a "handoff" from one 
source to the next. It seems like you could propose to add a reference to an 
extracted field or property of the first source to be used in the second one.

However, the more I think about it, the more I wonder whether a "handoff" 
operation ought to be necessary. For example, the use case I have in mind is to 
bootstrap the table using a snapshot of the data and then have it seamlessly 
switch over to consuming all the records since that snapshot. In order to 
support this use case with no loss or duplicates, timestamp isn't sufficient; 
I'd need to know the exact vector of offsets represented in that snapshot. Then 
again, if I control the snapshotting process, this should be trivial to compute 
and store next to the snapshots.

Further, when I register the table, I ought to know which exact snapshot I'm 
selecting, and therefore can just populate the `specific-offsets` as desired. 
Backing off to timestamp, if I again am naming a path to a specific snapshot of 
the data, it seems like I have enough information already to also specify the 
correct `timestamp` option.

With this in mind, my question is whether it's necessary to specify some kind 
of dynamic property, like the DataStream API does[2]. If a fixed property is 
sufficient, it seems like the current proposal is actually sufficient as well. 
I think I just don't see the use case for dynamic configuration here.

Side question, out of curiosity: would this source support chaining together 
more than two sources? It seems like the proposed syntax would allow it. It 
seems like some snapshot-rollup strategies could benefit from it (eg if you 
want to combine your 2021 yearly rollup with your Jan-Nov monthly rollups, then 
you first two weekly rollups from Dec, and finally switch over to live data 
from Kafka or something).

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time

Thanks again,
-John

On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
> Hi, Martijn, thanks for your comments.
>
> Using identifier as child source prefix may be a good way instead of index.
> i will update the flip to illustrate how we can read from hybrid schema to
> generate child schemas for the question1.
>
> question2 is start position for the next kafka source.  But currently we
> can not get the end timestamp for the first bounded source.  In the
> datastream api end timestamp can be found from previous enumerator. We need
> to offer bounded source(e.g. filesystem) end timestamp support.
> if we can get end timestamp then kafka will start from this offset. I think
> here we need a option, allow user to start next kafka source from previous
> one automatically or from user custom start offset (by using with option in
> sql ddl).  Not every second source need binding will previous one, for
> example, the next source is already a file, then it not need a start
> position.
>
> question3 about table api, i haven't added to flip yet. I will try to fix
> some current  issues and update the flip and add  more details.  Thanks for
> your comments.
>
>
> Martijn Visser  于2022年12月16日周五 16:59写道:
>
>> Hi Ran,
>>
>> For completeness, this is a new thread that was already previously started
>> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
>> linking them because I think Timo's comments are relevant to be kept with
>> this discussion thread.
>>
>> I agree with Timo's comments from there that having an index key isn't the
>> best option, I would rather have an identifier.
>>
>> I do wonder how this would work when you want to specify sources from a
>> catalog: could you elaborate on that?
>>
>> What I'm also missing in the FLIP is an example of how to specify the
>> starting offset from Kafka. In the DataStream API, there
>> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>> specify that in the SQL landscape?
>>
>> Last but not least: your examples are all SQL only. How do you propose that
>> this works in the Table API?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao  wrote:
>>
>> > Fyi.
>> >
>> > This flip using index as child source option prefix because we may use
>> the
>> > same connector as hybrid child sources.
>> > e.g.
>> >
>> > create table hybrid_source(
>> >  f0 varchar,
>> >  f1 varchar,
>> >  f2 bigint
>> > ) with(
>> >  'connector'='hybrid',
>> >  'sources'='filesystem,filesystem',
>> >  '0.path' = '/tmp/a.csv',
>> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread John Roesler
Thanks for this answer, Gyula!
-John

On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> Hi John!
>
> Thank you for the excellent question.
>
> There are few reasons why we felt that the operator is the right place for
> this component:
>
>  - Ideally the autoscaler is a separate process (an outside observer) , and
> the jobmanager is very much tied to the lifecycle of the job. The operator
> is a perfect example of such an external process that lives beyond
> individual jobs.
>  - Scaling itself might need some external resource management (for
> standalone clusters) that the jobmanager is not capable of, and the logic
> is already in the operator
> - Adding this to the operator allows us to integrate this fully in the
> lifecycle management of the application. This guarantees that scaling
> decisions do not interfere with upgrades, suspends etc.
> - By adding it to the operator, the autoscaler can potentially work on
> older Flink versions as well
> - The jobmanager is a component designed to handle Flink individual jobs,
> but the autoscaler component needs to work on a higher abstraction layer to
> be able to integrate with user job upgrades etc.
>
> These are some of the main things that come to my mind :)
>
> Having it in the operator ties this logic to Kubernetes itself but we feel
> that an autoscaler is mostly relevant in an elastic cloud environment
> anyways.
>
> Cheers,
> Gyula
>
> On Thu, Nov 24, 2022 at 9:40 PM John Roesler  wrote:
>
>> Hi Max,
>>
>> Thanks for the FLIP!
>>
>> I’ve been curious about one one point. I can imagine some good reasons for
>> it but wonder what you have in mind. What’s the reason to add auto scaling
>> to the Operator instead of to the JobManager?
>>
>> It seems like adding that capability to the JobManager would be a bigger
>> project, but it also would create some interesting opportunities.
>>
>> This is certainly not a suggestion, just a question.
>>
>> Thanks!
>> John
>>
>> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
>> > Thanks for your comments @Dong and @Chen. It is true that not all the
>> > details are contained in the FLIP. The document is meant as a general
>> > design concept.
>> >
>> > As for the rescaling time, this is going to be a configurable setting for
>> > now but it is foreseeable that we will provide auto-tuning of this
>> > configuration value by observing the job restart time. Same goes for the
>> > scaling decision itself which can learn from previous decisions. But we
>> > want to keep it simple for the first version.
>> >
>> > For sources that do not support the pendingRecords metric, we are
>> planning
>> > to either give the user the choice to set a manual target rate, or scale
>> it
>> > purely based on its utilization as reported via busyTimeMsPerSecond. In
>> > case of legacy sources, we will skip scaling these branches entirely
>> > because they support neither of these metrics.
>> >
>> > -Max
>> >
>> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
>> wrote:
>> >
>> >> >Do we think the scaler could be a plugin or hard coded ?
>> >>
>> >> +1 For pluggable scaling logic.
>> >>
>> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
>> >>
>> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra 
>> wrote:
>> >>>
>> >>> > Hi Chen!
>> >>> >
>> >>> > I think in the long term it makes sense to provide some pluggable
>> >>> > mechanisms but it's not completely trivial where exactly you would
>> plug
>> >>> in
>> >>> > your custom logic at this point.
>> >>> >
>> >>> sounds good, more specifically would be great if it can accept input
>> >>> features
>> >>> (including previous scaling decisions) and output decisions.
>> >>> Folks might keep their own secret sauce and avoid patching oss fork.
>> >>>
>> >>> >
>> >>> > In any case the problems you mentioned should be solved robustly by
>> the
>> >>> > algorithm itself without any customization:
>> >>> >  - We need to be able to detect ineffective scaling decisions, let\s
>> >>> say we
>> >>> > scaled up (expecting better throughput with a higher parallelism)
>> but we
>> >>> > did not get a better processing capacity (this would be the e

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread John Roesler
Hi Max,

Thanks for the FLIP!

I’ve been curious about one one point. I can imagine some good reasons for it 
but wonder what you have in mind. What’s the reason to add auto scaling to the 
Operator instead of to the JobManager?

It seems like adding that capability to the JobManager would be a bigger 
project, but it also would create some interesting opportunities.

This is certainly not a suggestion, just a question. 

Thanks!
John 

On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> Thanks for your comments @Dong and @Chen. It is true that not all the
> details are contained in the FLIP. The document is meant as a general
> design concept.
>
> As for the rescaling time, this is going to be a configurable setting for
> now but it is foreseeable that we will provide auto-tuning of this
> configuration value by observing the job restart time. Same goes for the
> scaling decision itself which can learn from previous decisions. But we
> want to keep it simple for the first version.
>
> For sources that do not support the pendingRecords metric, we are planning
> to either give the user the choice to set a manual target rate, or scale it
> purely based on its utilization as reported via busyTimeMsPerSecond. In
> case of legacy sources, we will skip scaling these branches entirely
> because they support neither of these metrics.
>
> -Max
>
> On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels  wrote:
>
>> >Do we think the scaler could be a plugin or hard coded ?
>>
>> +1 For pluggable scaling logic.
>>
>> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
>>
>>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:
>>>
>>> > Hi Chen!
>>> >
>>> > I think in the long term it makes sense to provide some pluggable
>>> > mechanisms but it's not completely trivial where exactly you would plug
>>> in
>>> > your custom logic at this point.
>>> >
>>> sounds good, more specifically would be great if it can accept input
>>> features
>>> (including previous scaling decisions) and output decisions.
>>> Folks might keep their own secret sauce and avoid patching oss fork.
>>>
>>> >
>>> > In any case the problems you mentioned should be solved robustly by the
>>> > algorithm itself without any customization:
>>> >  - We need to be able to detect ineffective scaling decisions, let\s
>>> say we
>>> > scaled up (expecting better throughput with a higher parallelism) but we
>>> > did not get a better processing capacity (this would be the external
>>> > service bottleneck)
>>> >
>>> sounds good, so we would at least try restart job once (optimistic path)
>>> as
>>> design choice.
>>>
>>> >  - We are evaluating metrics in windows, and we have some flexible
>>> > boundaries to avoid scaling on minor load spikes
>>> >
>>> yes, would be great if user can feed in throughput changes over different
>>> time buckets (last 10s, 30s, 1 min,5 mins) as input features
>>>
>>> >
>>> > Regards,
>>> > Gyula
>>> >
>>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
>>> >
>>> > > Hi Gyula,
>>> > >
>>> > > Do we think the scaler could be a plugin or hard coded ?
>>> > > We observed some cases scaler can't address (e.g async io dependency
>>> > > service degradation or small spike that doesn't worth restarting job)
>>> > >
>>> > > Thanks,
>>> > > Chen
>>> > >
>>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra 
>>> wrote:
>>> > >
>>> > > > Hi Dong!
>>> > > >
>>> > > > Could you please confirm that your main concerns have been
>>> addressed?
>>> > > >
>>> > > > Some other minor details that might not have been fully clarified:
>>> > > >  - The prototype has been validated on some production workloads yes
>>> > > >  - We are only planning to use metrics that are generally available
>>> and
>>> > > are
>>> > > > previously accepted to be standardized connector metrics (not Kafka
>>> > > > specific). This is actually specified in the FLIP
>>> > > >  - Even if some metrics (such as pendingRecords) are not accessible
>>> the
>>> > > > scaling algorithm works and can be used. For source scaling based on
>>> > > > utilization alone we still need some trivial modifications on the
>>> > > > implementation side.
>>> > > >
>>> > > > Cheers,
>>> > > > Gyula
>>> > > >
>>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
>>> > wrote:
>>> > > >
>>> > > > > Hi Dong!
>>> > > > >
>>> > > > > This is not an experimental feature proposal. The implementation
>>> of
>>> > the
>>> > > > > prototype is still in an experimental phase but by the time the
>>> FLIP,
>>> > > > > initial prototype and review is done, this should be in a good
>>> stable
>>> > > > first
>>> > > > > version.
>>> > > > > This proposal is pretty general as autoscalers/tuners get as far
>>> as I
>>> > > > > understand and there is no history of any alternative effort that
>>> > even
>>> > > > > comes close to the applicability of this solution.
>>> > > > >
>>> > > > > Any large features that were added to Flink in the past have gone
>>> > > through
>>> > > > > several iterations over the years 

Re: [DISCUSS] Allow sharing (RocksDB) memory between slots

2022-11-09 Thread John Roesler
Hi Roman,

Thanks for the proposal! This will make scheduling a lot more flexible for our 
use case.

Just a quick follow-up question about the number of new configs we’re adding 
here. It seems like the proposed configs provide a lot of flexibility, but at 
the expense of added complexity.

It seems like operators would either choose isolation for the cluster’s jobs or 
they would want to share the memory between jobs. I’m not sure I see the 
motivation to reserve only part of the memory for sharing and allowing jobs to 
choose whether they will share or be isolated.

I’m new to Flink, though, and no operational experience. Is there a use case 
you have in mind for this kind of split configuration?

Thanks,
John

On Wed, Nov 9, 2022, at 08:17, Roman Khachatryan wrote:
> Hi Yanfei,
>
> Thanks, good questions
>
>> 1. Is shared-memory only for the state backend? If both
>> "taskmanager.memory.managed.shared-fraction: >0" and
>> "state.backend.rocksdb.memory.managed: false" are set at the same time,
>> will the shared-memory be wasted?
> Yes, shared memory is only for the state backend currently;
> If no job uses it then it will be wasted.
> Session cluster can not validate this configuration
> because the job configuration is not known in advance.
>
>> 2. It's said that "Jobs 4 and 5 will use the same 750Mb of unmanaged
> memory
>> and will compete with each other" in the example, how is the memory size
> of
>> unmanaged part calculated?
> It's calculated the same way as managed memory size currently,
> i.e. taskmanager.memory.managed.size *
> taskmanager.memory.managed.shared-fraction
> Separate parameters for unmanaged memory would be more clear.
> However, I doubt that this configuration would ever be used (I listed it
> just for completeness).
> So I'm not sure whether adding them would be justified.
> WDYT?
>
>> 3. For fine-grained-resource-management, the control
>> of cpuCores, taskHeapMemory can still work, right?
> Yes, for other resources fine-grained-resource-management should work.
>
>> And I am a little
>> worried that too many memory-about configuration options are complicated
>> for users to understand.
> I'm also worried about having too many options, but I don't see any better
> alternative.
> The existing users definitely shouldn't be affected,
> so there must be at least feature toggle ("shared-fraction").
> "share-scope" could potentially be replaced by some inference logic,
> but having it explicit seems less error-prone.
>
> Regards,
> Roman
>
>
> On Wed, Nov 9, 2022 at 3:50 AM Yanfei Lei  wrote:
>
>> Hi Roman,
>> Thanks for the proposal, this allows State Backend to make better use of
>> memory.
>>
>> After reading the ticket, I'm curious about some points:
>>
>> 1. Is shared-memory only for the state backend? If both
>> "taskmanager.memory.managed.shared-fraction: >0" and
>> "state.backend.rocksdb.memory.managed: false" are set at the same time,
>> will the shared-memory be wasted?
>> 2. It's said that "Jobs 4 and 5 will use the same 750Mb of unmanaged memory
>> and will compete with each other" in the example, how is the memory size of
>> unmanaged part calculated?
>> 3. For fine-grained-resource-management, the control
>> of cpuCores, taskHeapMemory can still work, right?  And I am a little
>> worried that too many memory-about configuration options are complicated
>> for users to understand.
>>
>> Regards,
>> Yanfei
>>
>> Roman Khachatryan  于2022年11月8日周二 23:22写道:
>>
>> > Hi everyone,
>> >
>> > I'd like to discuss sharing RocksDB memory across slots as proposed in
>> > FLINK-29928 [1].
>> >
>> > Since 1.10 / FLINK-7289 [2], it is possible to:
>> > - share these objects among RocksDB instances of the same slot
>> > - bound the total memory usage by all RocksDB instances of a TM
>> >
>> > However, the memory is divided between the slots equally (unless using
>> > fine-grained resource control). This is sub-optimal if some slots contain
>> > more memory intensive tasks than the others.
>> > Using fine-grained resource control is also often not an option because
>> the
>> > workload might not be known in advance.
>> >
>> > The proposal is to widen the scope of sharing memory to TM, so that it
>> can
>> > be shared across all RocksDB instances of that TM. That would reduce the
>> > overall memory consumption in exchange for resource isolation.
>> >
>> > Please see FLINK-29928 [1] for more details.
>> >
>> > Looking forward to feedback on that proposal.
>> >
>> > [1]
>> > https://issues.apache.org/jira/browse/FLINK-29928
>> > [2]
>> > https://issues.apache.org/jira/browse/FLINK-7289
>> >
>> > Regards,
>> > Roman
>> >
>>


[jira] [Created] (FLINK-29962) Exclude Jamon 2.3.1

2022-11-09 Thread John Roesler (Jira)
John Roesler created FLINK-29962:


 Summary: Exclude Jamon 2.3.1
 Key: FLINK-29962
 URL: https://issues.apache.org/jira/browse/FLINK-29962
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive, Table SQL / Gateway
Reporter: John Roesler


Hi all,

My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a 
malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to 
dependency:tree, Flink already has transitive dependencies on both versions, so 
I'm proposing to just exclude the transitive dependency from the problematic 
direct dependencies and pin the dependency to 2.4.1.

I'll send a PR shortly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)