Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-04-25 Thread Arvid Heise
Hi Qingsheng,

Thanks for driving this; the inconsistency was not satisfying for me.

I second Alexander's idea though but could also live with an easier
solution as the first step: Instead of making caching an implementation
detail of TableFunction X, rather devise a caching layer around X. So the
proposal would be a CachingTableFunction that delegates to X in case of
misses and else manages the cache. Lifting it into the operator model as
proposed would be even better but is probably unnecessary in the first step
for a lookup source (as the source will only receive the requests after
filter; applying projection may be more interesting to save memory).

Another advantage is that all the changes of this FLIP would be limited to
options, no need for new public interfaces. Everything else remains an
implementation of Table runtime. That means we can easily incorporate the
optimization potential that Alexander pointed out later.

@Alexander unfortunately, your architecture is not shared. I don't know the
solution to share images to be honest.

On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов 
wrote:

> Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd
> really like to become one. And this FLIP really interested me.
> Actually I have worked on a similar feature in my company’s Flink
> fork, and we would like to share our thoughts on this and make code
> open source.
>
> I think there is a better alternative than introducing an abstract
> class for TableFunction (CachingTableFunction). As you know,
> TableFunction exists in the flink-table-common module, which provides
> only an API for working with tables – it’s very convenient for importing
> in connectors. In turn, CachingTableFunction contains logic for
> runtime execution,  so this class and everything connected with it
> should be located in another module, probably in flink-table-runtime.
> But this will require connectors to depend on another module, which
> contains a lot of runtime logic, which doesn’t sound good.
>
> I suggest adding a new method ‘getLookupConfig’ to LookupTableSource
> or LookupRuntimeProvider to allow connectors to only pass
> configurations to the planner, therefore they won’t depend on runtime
> realization. Based on these configs planner will construct a lookup
> join operator with corresponding runtime logic (ProcessFunctions in
> module flink-table-runtime). Architecture looks like in the pinned
> image (LookupConfig class there is actually yours CacheConfig).
>
> Classes in flink-table-planner, that will be responsible for this –
> CommonPhysicalLookupJoin and his inheritors.
> Current classes for lookup join in  flink-table-runtime  -
> LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc,
> AsyncLookupJoinRunnerWithCalc.
>
> I suggest adding classes LookupJoinCachingRunner,
> LookupJoinCachingRunnerWithCalc, etc.
>
> And here comes another more powerful advantage of such a solution. If
> we have caching logic on a lower level, we can apply some
> optimizations to it. LookupJoinRunnerWithCalc was named like this
> because it uses the ‘calc’ function, which actually mostly consists of
> filters and projections.
>
> For example, in join table A with lookup table B condition ‘JOIN … ON
> A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’  ‘calc’
> function will contain filters A.age = B.age + 10 and B.salary > 1000.
>
> If we apply this function before storing records in cache, size of
> cache will be significantly reduced: filters = avoid storing useless
> records in cache, projections = reduce records’ size. So the initial
> max number of records in cache can be increased by the user.
>
> What do you think about it?
>
>
> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> > Hi devs,
> >
> > Yuan and I would like to start a discussion about FLIP-221[1], which
> introduces an abstraction of lookup table cache and its standard metrics.
> >
> > Currently each lookup table source should implement their own cache to
> store lookup results, and there isn’t a standard of metrics for users and
> developers to tuning their jobs with lookup joins, which is a quite common
> use case in Flink table / SQL.
> >
> > Therefore we propose some new APIs including cache, metrics, wrapper
> classes of TableFunction and new table options. Please take a look at the
> FLIP page [1] to get more details. Any suggestions and comments would be
> appreciated!
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >
> > Best regards,
> >
> > Qingsheng
> >
> >
>


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-04-25 Thread Arvid Heise
t;
> [1]
>
>
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation
>
> On 22/04/2022 05:59, Becket Qin wrote:
>
> Thanks for driving the effort, Sebastion. I think the motivation makes a
> lot of sense. Just a few suggestions / questions.
>
> 1. I think watermark alignment is sort of a general use case, so should
>
> we
>
> just add the related methods to SourceReader directly instead of
> introducing the new interface of WithSplitAssignment? We can provide
> default implementations, so backwards compatibility won't be an issue.
>
> 2. As you mentioned, the SplitReader interface probably also needs some
> change to support throttling at the split granularity. Can you add that
> interface change into the public interface section as well?
>
> 3. Nit, can we avoid using the method name assignSplits here, given that
>
> it
>
> is not actually changing the split assignments? It seems something like
> pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu  
>  <
>
> stevenz...@gmail.com> <
>
> stevenz...@gmail.com> wrote:
>
> However, a single source operator may read data from multiple
>
> splits/partitions, e.g., multiple Kafka partitions, such that even with
> watermark alignment the source operator may need to buffer excessive
>
> amount
>
> of data if one split emits data faster than another.
>
> For this part from the motivation section, is it accurate? Let's assume
>
> one
>
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow
>
> partition.
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
>
>
> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise  
>  <
>
> t...@apache.org> <
>
> t...@apache.org> wrote:
>
> Thanks for working on this!
>
> I wonder if "supporting" split alignment in SourceReaderBase and then
>
> doing
>
> nothing if the split reader does not implement AlignedSplitReader could
>
> be
>
> misleading? Perhaps WithSplitsAlignment can instead be added to the
> specific source reader (i.e. KafkaSourceReader) to make it explicit that
> the source actually supports it.
>
> Thanks,
> Thomas
>
>
> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf  
>  <
>
> kna...@apache.org> <
>
> kna...@apache.org>
>
> wrote:
>
>
> Hi Sebastian, Hi Dawid,
>
> As part of this FLIP, the `AlignedSplitReader` interface (aka the stop
>
> &
>
> resume behavior) will be implemented for Kafka and Pulsar only,
>
> correct?
>
> +1 in general. I believe it is valuable to complete the watermark
>
> aligned
>
> story with this FLIP.
>
> Cheers,
>
> Konstantin
>
>
>
>
>
>
>
> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz 
>
> wrote:
>
>
> To be explicit, having worked on it, I support it ;) I think we can
> start a vote thread soonish, as there are no concerns so far.
>
> Best,
>
> Dawid
>
> On 13/04/2022 11:27, Sebastian Mattheis wrote:
>
> Dear Flink developers,
>
> I would like to open a discussion on FLIP 217 [1] for an extension
>
> of
>
> Watermark Alignment to perform alignment also in SplitReaders. To
>
> do
>
> so,
>
> SplitReaders must be able to suspend and resume reading from split
>
> sources
>
> where the SourceOperator coordinates and controlls suspend and
>
> resume.
>
> To
>
> gather information about current watermarks of the SplitReaders, we
>
> extend
>
> the internal WatermarkOutputMulitplexer and report watermarks to
>
> the
>
> SourceOperator.
>
> There is a PoC for this FLIP [2], prototyped by Arvid Heise and
>
> revised
>
> and
>
> reworked by Dawid Wysakowicz (He did most of the work.) and me. The
>
> changes
>
> are backwards compatible in a way that if affected components do
>
> not
>
> support split alignment the behavior is as before.
>
> Best,
> Sebastian
>
> [1]
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>
> [2] https://github.com/dawidwys/flink/tree/aligned-splits
>
> --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>


Re: [ANNOUNCE] New Apache Flink Committer - David Morávek

2022-03-14 Thread Arvid Heise
Gratz!

On Mon, Mar 14, 2022 at 9:24 AM Anton Kalashnikov 
wrote:

> Congratulations, David!
>
> --
>
> Best regards,
> Anton Kalashnikov
>
> 14.03.2022 09:18, Matthias Pohl пишет:
> > Congratulations, David!
>
>


Re: [ANNOUNCE] New PMC member: Yuan Mei

2022-03-14 Thread Arvid Heise
Congratulations and well deserved!

On Mon, Mar 14, 2022 at 9:30 AM Matthias Pohl  wrote:

> Congratulations, Yuan.
>
> On Mon, Mar 14, 2022 at 9:25 AM Shuo Cheng  wrote:
>
> > Congratulations, Yuan!
> >
> > On Mon, Mar 14, 2022 at 4:22 PM Anton Kalashnikov 
> > wrote:
> >
> > > Congratulations, Yuan!
> > >
> > > --
> > >
> > > Best regards,
> > > Anton Kalashnikov
> > >
> > > 14.03.2022 09:13, Leonard Xu пишет:
> > > > Congratulations Yuan!
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > >> 2022年3月14日 下午4:09,Yangze Guo  写道:
> > > >>
> > > >> Congratulations!
> > > >>
> > > >> Best,
> > > >> Yangze Guo
> > > >>
> > > >> On Mon, Mar 14, 2022 at 4:08 PM Martijn Visser <
> > > martijnvis...@apache.org> wrote:
> > > >>> Congratulations Yuan!
> > > >>>
> > > >>> On Mon, 14 Mar 2022 at 09:02, Yu Li  wrote:
> > > >>>
> > >  Hi all!
> > > 
> > >  I'm very happy to announce that Yuan Mei has joined the Flink PMC!
> > > 
> > >  Yuan is helping the community a lot with creating and validating
> > > releases,
> > >  contributing to FLIP discussions and good code contributions to
> the
> > >  state backend and related components.
> > > 
> > >  Congratulations and welcome, Yuan!
> > > 
> > >  Best Regards,
> > >  Yu (On behalf of the Apache Flink PMC)
> > > 
> > > --
> > >
> > > Best regards,
> > > Anton Kalashnikov
> > >
> > >
>


Re: [VOTE] FLIP-191: Extend unified Sink interface to support small file compaction

2022-01-04 Thread Arvid Heise
+1 (binding).

Thanks for driving!

On Tue, Jan 4, 2022 at 10:31 AM Yun Gao 
wrote:

> +1 (binding).
>
> Very thanks for proposing the FLIP!
>
> Best,
> Yun
>
>
> --
> From:Martijn Visser 
> Send Time:2022 Jan. 4 (Tue.) 17:22
> To:dev 
> Subject:Re: [VOTE] FLIP-191: Extend unified Sink interface to support
> small file compaction
>
> +1 (non-binding). Thanks for driving the FLIP!
>
> Best regards,
>
> Martijn
>
> On Tue, 4 Jan 2022 at 10:21, Fabian Paul  wrote:
>
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-191: Extend unified Sink interface to
> > support small file compaction [1] that has been discussed in this
> > thread [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection
> or
> > not enough votes.
> >
> > Best,
> > Fabian
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > [2] https://lists.apache.org/thread/97kwy315t9r4j02l8v0wotkll4tngb3m
> >
>
>


Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
I just noticed offline with Yun that I have some misconception on how how
blocking data exchange work in Flink. Apparently, a subtask of the stage 2
is already started when all connected subtasks of stage 1 are finished. I
was assuming that it works similarly to Hadoop where all tasks of stage 1
have to be finished before any task of stage 2 is started.
So to amend the proposal, we would introduce a full shuffle before
committer in batch. That would simulate the behavior of Hadoop.

We also need to verify what happens in case of multiple sinks. In that
case, we may have started committer of sink 1 while sink 2 is still
writing. That would be fine as long as sink 1 writers are not restarted. If
not then it seems as if we may improve scheduler to actually not restart
"finished" tasks. It seems like an orthogonal issue though.

Note that all discussed issues apply to Sink V1 as well, so the FLIP is not
making anything worse.


On Thu, Dec 16, 2021 at 1:53 PM Arvid Heise  wrote:

> Hi Yun,
>
> the basic idea is to use different regions for the different stages.
>
> So you have your stages:
> Stage 1:  -> pre-writer topology -> writer -> committables
> (materialized)
> Stage 2: committables (materialized) -> pre-commit topology -> committer
> -> succesful committables (materialized)
> Stage 3: succesful committables (materialized) -> post-commit topology
>
> If any instance of a given stages fails, the whole stage is restarted.
> So, in case of the main pipeline (stage 1) fails, no data will be
> committed at all. On a rerun, we start fresh (or from the previous stage).
> Only, when all data has been written, we start with committing the data.
> An issue during committing, will retrigger the commit stage (stage 2) and
> only that stage. Thus, all committables are stable and remain stable.
> When we are done committing all committables, we run the post-commit
> topology similarly "atomically".
>
> So now the cavaets:
> - If committer is rerun, all committables are restarted. So the committer
> needs to be idempotent. That is the same with STREAMING mode now and afaik
> there is no way around it.
> - If we lose a TM during commit phase, we will run into the original
> issues of inconstent data as we need to rerun the whole job. That would be
> solved with HA storage and we haven't found any solution that doesn't
> require some kind of external storage. However, the commit phase should be
> rather fast and errors are improbable (low volume).
>
> I'd still like to have an HA storage but that issue is also in Sink V1 and
> kind of orthogonal. It's also nothing that we can solve without involving
> more folks (I'm trying to kick start that in the background).
>
> On Thu, Dec 16, 2021 at 1:31 PM Yun Gao  wrote:
>
>> Hi,
>>
>> Very thanks Fabian for the explanation and it solves most of the issues.
>> There is one left issue I want to have a double confirmation is that for
>> the edges between writer and committer and in the post-committer
>> topology,
>> perhaps the result partition with HA storage is not enough solve all the
>> issues
>> directly ? It is due to after the committer and post-committer topology
>> is finished
>> and the data is committed, it might still be restarted due to JM failover
>> and the
>> deterministic problem (namely the example of  (A -> [B1, B2], A, B1 have
>> finished and
>> B2 failed, if -> is rebalance / random / rescale,  all of A, B1, B2 would
>> restarted). Then
>> the records would be produced and created for the second times.
>>
>> We might let the writers to skip producing the new records, but if we
>> have multiple sinks like
>> OP1 -> (writer 1 -> committer 1)
>>  |--> (writer 2 -> committer 2)
>>
>> and the failover happens after writer 1 & committer 1 get finished but
>> writer 2 is running,
>> if op1 produced different records across the two runs, then the two sinks
>> would produces
>> different data, which might be not suitable in some cases. Perhaps we
>> need some support
>> from the scheduler side?
>>
>> But I also agree this could be a separate issue and we could solve it 
>> separately
>> in some future
>> as long as we know how to solve it~
>>
>> Best,
>> Yun
>>
>>
>> --
>> From:Arvid Heise 
>> Send Time:2021 Dec. 16 (Thu.) 19:54
>> To:dev 
>> Cc:Yun Gao 
>> Subject:Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to
>> support small file compaction
>>
>> Just a quick amend: There will be no blocking exchange in the pre-writer
>>

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
> Send Date:Mon Dec 13 23:59:43 2021
> > > > Recipients:dev 
> > > > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to
> support
> > > small file compaction
> > > >>
> > > >> Hi all,
> > > >>
> > > >>
> > > >>
> > > >> After a lot of discussions with different, we received very fruitful
> > > >>
> > > >> feedback and reworked the ideas behind this FLIP. Initially, we had
> > > >>
> > > >> the impression that the compaction problem is solvable by a single
> > > >>
> > > >> topology that we can reuse across different sinks. We now have a
> > > >>
> > > >> better understanding that different external systems require
> different
> > > >>
> > > >> compaction mechanism i.e. Hive requires compaction before finally
> > > >>
> > > >> registering the file in the metastore or Iceberg compacts the files
> > > >>
> > > >> after they have been registered and just lazily compacts them.
> > > >>
> > > >>
> > > >>
> > > >> Considering all these different views we came up with a design that
> > > >>
> > > >> builds upon what @guowei@gmail.com and @yungao...@aliyun.com
> have
> > > >>
> > > >> proposed at the beginning. We allow inserting custom topologies
> before
> > > >>
> > > >> and after the SinkWriters and Committers. Furthermore, we do not see
> > > >>
> > > >> it as a downside. The Sink interfaces that will expose the
> DataStream
> > > >>
> > > >> to the user reside in flink-streaming-java in contrast to the basic
> > > >>
> > > >> Sink interfaces that reside fin flink-core deem it to be only used
> by
> > > >>
> > > >> expert users.
> > > >>
> > > >>
> > > >>
> > > >> Moreover, we also wanted to remove the global committer from the
> > > >>
> > > >> unified Sink interfaces and replace it with a custom post-commit
> > > >>
> > > >> topology. Unfortunately, we cannot do it without breaking the Sink
> > > >>
> > > >> interface since the GlobalCommittables are part of the parameterized
> > > >>
> > > >> Sink interface. Thus, we propose building a new Sink V2 interface
> > > >>
> > > >> consisting of composable interfaces that do not offer the
> > > >>
> > > >> GlobalCommitter anymore. We will implement a utility to extend a
> Sink
> > > >>
> > > >> with post topology that mimics the behavior of the GlobalCommitter.
> > > >>
> > > >> The new Sink V2 provides the same sort of methods as the Sink V1
> > > >>
> > > >> interface, so a migration of sinks that do not use the
> GlobalCommitter
> > > >>
> > > >> should be very easy.
> > > >>
> > > >> We plan to keep the existing Sink V1 interfaces to not break
> > > >>
> > > >> externally built sinks. As part of this FLIP, we migrate all the
> > > >>
> > > >> connectors inside of the main repository to the new Sink V2 API.
> > > >>
> > > >>
> > > >>
> > > >> The FLIP document is also updated and includes the proposed changes.
> > > >>
> > > >>
> > > >>
> > > >> Looking forward to your feedback,
> > > >>
> > > >> Fabian
> > > >>
> > > >>
> > > >>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Dec 2, 2021 at 10:15 AM Roman Khachatryan wrote:
> > > >>
> > > >> >
> > > >>
> > > >> > Thanks for clarifying (I was initially confused by merging state
> files
> > > >>
> > > >> > rather than output files).
> > > >>
> > > >> >
> > > >>
> > > >> > > At some point, Flink will definitely 

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
 have one `sinkTo(Sink sink)` , perhaps we also need to make
> the
> > > Sink v1 to be a subclass of
> > > > Sink v2 and extends the stateful and two-phase-commit sinks, right?
> > > >
> > > > 3. I'd like also have a confirmation on ours thoughts with the
> > > `DataStreamSink` returned by the sinkTo method:
> > > > The main issue is how do we implement the method like
> `setParallelism`
> > > or `setMaxParallelism` since now the sink
> > > > would be translated to multiple transformations? perhaps we could
> make
> > > it the default values for all the transformations
> > > > for the sink? A related issue would be for iceberg sink, I think it
> > > would need to have only one committer to avoid the
> > > > competition of the optimistic locks (which would cause performance
> > > degradation), then it might need to have N writers
> > > > with 1 committers, to build such topology, perhaps we might need to
> add
> > > new methods to specify the parallelism of
> > > > the writers and committers separately?
> > > >
> > > > Best,
> > > > Yun
> > > >
> > > >
> > > > --Original Mail --
> > > > Sender:Fabian Paul 
> > > > Send Date:Mon Dec 13 23:59:43 2021
> > > > Recipients:dev 
> > > > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to
> support
> > > small file compaction
> > > >>
> > > >> Hi all,
> > > >>
> > > >>
> > > >>
> > > >> After a lot of discussions with different, we received very fruitful
> > > >>
> > > >> feedback and reworked the ideas behind this FLIP. Initially, we had
> > > >>
> > > >> the impression that the compaction problem is solvable by a single
> > > >>
> > > >> topology that we can reuse across different sinks. We now have a
> > > >>
> > > >> better understanding that different external systems require
> different
> > > >>
> > > >> compaction mechanism i.e. Hive requires compaction before finally
> > > >>
> > > >> registering the file in the metastore or Iceberg compacts the files
> > > >>
> > > >> after they have been registered and just lazily compacts them.
> > > >>
> > > >>
> > > >>
> > > >> Considering all these different views we came up with a design that
> > > >>
> > > >> builds upon what @guowei@gmail.com and @yungao...@aliyun.com
> have
> > > >>
> > > >> proposed at the beginning. We allow inserting custom topologies
> before
> > > >>
> > > >> and after the SinkWriters and Committers. Furthermore, we do not see
> > > >>
> > > >> it as a downside. The Sink interfaces that will expose the
> DataStream
> > > >>
> > > >> to the user reside in flink-streaming-java in contrast to the basic
> > > >>
> > > >> Sink interfaces that reside fin flink-core deem it to be only used
> by
> > > >>
> > > >> expert users.
> > > >>
> > > >>
> > > >>
> > > >> Moreover, we also wanted to remove the global committer from the
> > > >>
> > > >> unified Sink interfaces and replace it with a custom post-commit
> > > >>
> > > >> topology. Unfortunately, we cannot do it without breaking the Sink
> > > >>
> > > >> interface since the GlobalCommittables are part of the parameterized
> > > >>
> > > >> Sink interface. Thus, we propose building a new Sink V2 interface
> > > >>
> > > >> consisting of composable interfaces that do not offer the
> > > >>
> > > >> GlobalCommitter anymore. We will implement a utility to extend a
> Sink
> > > >>
> > > >> with post topology that mimics the behavior of the GlobalCommitter.
> > > >>
> > > >> The new Sink V2 provides the same sort of methods as the Sink V1
> > > >>
> > > >> interface, so a migration of sinks that do not use the
> GlobalCommitter
> > > >>
> > > >> should be very easy.
> > > >>
> > > >> We plan to kee

Re: [DISCUSS] Creating an external connector repository

2021-12-09 Thread Arvid Heise
1/2021 12:47, Till Rohrmann wrote:
> > Hi Arvid,
> >
> > Thanks for updating this thread with the latest findings. The described
> > limitations for a single connector repo sound suboptimal to me.
> >
> > * Option 2. sounds as if we try to simulate multi connector repos inside
> of
> > a single repo. I also don't know how we would share code between the
> > different branches (sharing infrastructure would probably be easier
> > though). This seems to have the same limitations as dedicated repos with
> > the downside of having a not very intuitive branching model.
> > * Isn't option 1. kind of a degenerated version of option 2. where we
> have
> > some unrelated code from other connectors in the individual connector
> > branches?
> > * Option 3. has the downside that someone creating a release has to
> release
> > all connectors. This means that she either has to sync with the different
> > connector maintainers or has to be able to release all connectors on her
> > own. We are already seeing in the Flink community that releases require
> > quite good communication/coordination between the different people
> working
> > on different Flink components. Given our goals to make connector releases
> > easier and more frequent, I think that coupling different connector
> > releases might be counter-productive.
> >
> > To me it sounds not very practical to mainly use a mono repository w/o
> > having some more advanced build infrastructure that, for example, allows
> to
> > have different git roots in different connector directories. Maybe the
> mono
> > repo can be a catch all repository for connectors that want to be
> released
> > in lock-step (Option 3.) with all other connectors the repo contains. But
> > for connectors that get changed frequently, having a dedicated repository
> > that allows independent releases sounds preferable to me.
> >
> > What utilities and infrastructure code do you intend to share? Using git
> > submodules can definitely be one option to share code. However, it might
> > also be ok to depend on flink-connector-common artifacts which could make
> > things easier. Where I am unsure is whether git submodules can be used to
> > share infrastructure code (e.g. the .github/workflows) because you need
> > these files in the repo to trigger the CI infrastructure.
> >
> > Cheers,
> > Till
> >
> > On Thu, Nov 25, 2021 at 1:59 PM Arvid Heise  wrote:
> >
> >> Hi Brian,
> >>
> >> Thank you for sharing. I think your approach is very valid and is in
> line
> >> with what I had in mind.
> >>
> >> Basically Pravega community aligns the connector releases with the
> Pravega
> >>> mainline release
> >>>
> >> This certainly would mean that there is little value in coupling
> connector
> >> versions. So it's making a good case for having separate connector
> repos.
> >>
> >>
> >>> and maintains the connector with the latest 3 Flink versions(CI will
> >>> publish snapshots for all these 3 branches)
> >>>
> >> I'd like to give connector devs a simple way to express to which Flink
> >> versions the current branch is compatible. From there we can generate
> the
> >> compatibility matrix automatically and optionally also create different
> >> releases per supported Flink version. Not sure if the latter is indeed
> >> better than having just one artifact that happens to run with multiple
> >> Flink versions. I guess it depends on what dependencies we are
> exposing. If
> >> the connector uses flink-connector-base, then we probably need separate
> >> artifacts with poms anyways.
> >>
> >> Best,
> >>
> >> Arvid
> >>
> >> On Fri, Nov 19, 2021 at 10:55 AM Zhou, Brian  wrote:
> >>
> >>> Hi Arvid,
> >>>
> >>> For branching model, the Pravega Flink connector has some experience
> what
> >>> I would like to share. Here[1][2] is the compatibility matrix and wiki
> >>> explaining the branching model and releases. Basically Pravega
> community
> >>> aligns the connector releases with the Pravega mainline release, and
> >>> maintains the connector with the latest 3 Flink versions(CI will
> publish
> >>> snapshots for all these 3 branches).
> >>> For example, recently we have 0.10.1 release[3], and in maven central
> we
> >>> need to upload three artifacts(For Flink 1.13, 1.12, 1.11) for 0.10.1
> >>> version[4].
&g

Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Arvid Heise
Could someone please help me understand the implications of the upgrade?

As far as I understood this upgrade would only affect users that have a
zookeeper shared across multiple services, some of which require ZK 3.4-? A
workaround for those users would be to run two ZKs with different versions,
eventually deprecating old ZK, correct?

If that is the only limitation, I'm +1 for the proposal since ZK 3.4 is
already EOL.

How are K8s users affected?

Best,

Arvid

On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler  wrote:

> ping @users; any input on how this would affect you is highly appreciated.
>
> On 25/11/2021 22:39, Chesnay Schepler wrote:
> > I included the user ML in the thread.
> >
> > @users Are you still using Zookeeper 3.4? If so, were you planning to
> > upgrade Zookeeper in the near future?
> >
> > I'm not sure about ZK compatibility, but we'd also upgrade Curator to
> > 5.x, which doesn't support ookeeperK 3.4 anymore.
> >
> > On 25/11/2021 21:56, Till Rohrmann wrote:
> >> Should we ask on the user mailing list whether anybody is still using
> >> ZooKeeper 3.4 and thus needs support for this version or can a ZooKeeper
> >> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that
> >> not a
> >> lot of users depend on it but just to make sure that we aren't
> >> annoying a
> >> lot of our users with this change. Apart from that +1 for removing it if
> >> not a lot of user depend on it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl 
> >> wrote:
> >>
> >>> Thanks for starting this discussion, Chesnay. +1 from my side. It's
> >>> time to
> >>> move forward with the ZK support considering the EOL of 3.4 you already
> >>> mentioned. The benefits we gain from upgrading Curator to 5.x as a
> >>> consequence is another plus point. Just for reference on the
> >>> inconsistent
> >>> state issue you mentioned: FLINK-24543 [1].
> >>>
> >>> Matthias
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-24543
> >>>
> >>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler 
> >>> wrote:
> >>>
>  Hello,
> 
>  I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
>  default to 3.5 with an opt-in for 3.6.
> 
>  Supporting Zookeeper 3.4 (which is already EOL) prevents us from
>  upgrading Curator to 5.x, which would allow us to properly fix an
>  issue
>  with inconsistent state. It is also required to eventually support ZK
> >>> 3.6.
> >
> >
>
>


Re: [VOTE] Deprecate Java 8 support

2021-12-06 Thread Arvid Heise
+1 (binding)

On Mon, Dec 6, 2021 at 5:43 PM Timo Walther  wrote:

> +1 (binding)
>
> Thanks,
> Timo
>
> On 06.12.21 17:28, David Morávek wrote:
> > +1 (non-binding)
> >
> > On Mon, Dec 6, 2021 at 4:55 PM Ingo Bürk  wrote:
> >
> >> +1 (non-binding)
> >>
> >>
> >> Ingo
> >>
> >> On Mon, Dec 6, 2021 at 4:44 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> Hello,
> >>>
> >>> after recent discussions on the dev
> >>>  and
> >>> user  >
> >>> mailing list to deprecate Java 8 support, with a general consensus in
> >>> favor of it, I would now like tod o a formal vote.
> >>>
> >>> The deprecation would entail a notification to our users to encourage
> >>> migrating to Java 11, and various efforts on our side to prepare a
> >>> migration to Java 11, like updating some e2e tests to actually run on
> >>> Java 11, performance benchmarking etc. .
> >>>
> >>> There is no set date for the removal of Java 8 support.
> >>>
> >>> We'll use the usual minimum 72h vote duration, with committers having
> >>> binding votes.
> >>>
> >>>
> >>
> >
>
>


Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-29 Thread Arvid Heise
>
> > One way to avoid write-read-merge is by wrapping SinkWriter with
> > another one, which would buffer input elements in a temporary storage
> > (e.g. local file) until a threshold is reached; after that, it would
> > invoke the original SinkWriter. And if a checkpoint barrier comes in
> > earlier, it would send written data to some aggregator.
>
> I think perhaps this seems to be a kind of WAL method? Namely we first
> write the elements to some WAL logs and persist them on checkpoint
> (in snapshot or remote FS), or we directly write WAL logs to the remote
> FS eagerly.
>
> At some point, Flink will definitely have some WAL adapter that can turn
any sink into an exactly-once sink (with some caveats). For now, we keep
that as an orthogonal solution as it has a rather high price (bursty
workload with high latency). Ideally, we can keep the compaction
asynchronously...

On Mon, Nov 29, 2021 at 8:52 AM Yun Gao 
wrote:

> Hi,
>
> @Roman very sorry for the late response for a long time,
>
> > Merging artifacts from multiple checkpoints would apparently
> require multiple concurrent checkpoints
>
> I think it might not need concurrent checkpoints: suppose some
> operators (like the committer aggregator in the option 2) maintains
> the list of files to merge, it could stores the lists of files to merge
> in the states, then after several checkpoints are done and we have
> enough files, we could merge all the files in the list.
>
> > Asynchronous merging in an aggregator would require some resolution
> > logic on recovery, so that a merged artifact can be used if the
> > original one was deleted. Otherwise, wouldn't recovery fail because
> > some artifacts are missing?
> > We could also defer deletion until the "compacted" checkpoint is
> > subsumed - but isn't it too late, as it will be deleted anyways once
> > subsumed?
>
> I think logically we could delete the original files once the "compacted"
> checkpoint
> (which finish merging the compacted files and record it in the checkpoint)
> is completed
> in all the options. If there are failover before we it, we could restart
> the merging and if
> there are failover after it, we could have already recorded the files in
> the checkpoint.
>
> > One way to avoid write-read-merge is by wrapping SinkWriter with
> > another one, which would buffer input elements in a temporary storage
> > (e.g. local file) until a threshold is reached; after that, it would
> > invoke the original SinkWriter. And if a checkpoint barrier comes in
> > earlier, it would send written data to some aggregator.
>
> I think perhaps this seems to be a kind of WAL method? Namely we first
> write the elements to some WAL logs and persist them on checkpoint
> (in snapshot or remote FS), or we directly write WAL logs to the remote
> FS eagerly.
>
> Sorry if I do not understand correctly somewhere.
>
> Best,
> Yun
>
>
> --
> From:Roman Khachatryan 
> Send Time:2021 Nov. 9 (Tue.) 22:03
> To:dev 
> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
> small file compaction
>
> Hi everyone,
>
> Thanks for the proposal and the discussion, I have some remarks:
> (I'm not very familiar with the new Sink API but I thought about the
> same problem in context of the changelog state backend)
>
> 1. Merging artifacts from multiple checkpoints would apparently
> require multiple concurrent checkpoints (otherwise, a new checkpoint
> won't be started before completing the previous one; and the previous
> one can't be completed before durably storing the artifacts). However,
> concurrent checkpoints are currently not supported with Unaligned
> checkpoints (this is besides increasing e2e-latency).
>
> 2. Asynchronous merging in an aggregator would require some resolution
> logic on recovery, so that a merged artifact can be used if the
> original one was deleted. Otherwise, wouldn't recovery fail because
> some artifacts are missing?
> We could also defer deletion until the "compacted" checkpoint is
> subsumed - but isn't it too late, as it will be deleted anyways once
> subsumed?
>
> 3. Writing small files, then reading and merging them for *every*
> checkpoint seems worse than only reading them on recovery. I guess I'm
> missing some cases of reading, so to me it would make sense to mention
> these cases explicitly in the FLIP motivation section.
>
> 4. One way to avoid write-read-merge is by wrapping SinkWriter with
> another one, which would buffer input elements in a temporary storage
> (e.g. local file) until a threshold is reached; after that, it would
> invoke the original SinkWriter. And if a checkpoint barrier comes in
> earlier, it would send written data to some aggregator. It will
> increase checkpoint delay (async phase) compared to the current Flink;
> but not compared to the write-read-merge solution, IIUC.
> Then such "BufferingSinkWriters" could aggregate input elements from
> each other, potentially 

Re: [DISCUSS] Creating an external connector repository

2021-11-25 Thread Arvid Heise
Hi Brian,

Thank you for sharing. I think your approach is very valid and is in line
with what I had in mind.

Basically Pravega community aligns the connector releases with the Pravega
> mainline release
>
This certainly would mean that there is little value in coupling connector
versions. So it's making a good case for having separate connector repos.


> and maintains the connector with the latest 3 Flink versions(CI will
> publish snapshots for all these 3 branches)
>
I'd like to give connector devs a simple way to express to which Flink
versions the current branch is compatible. From there we can generate the
compatibility matrix automatically and optionally also create different
releases per supported Flink version. Not sure if the latter is indeed
better than having just one artifact that happens to run with multiple
Flink versions. I guess it depends on what dependencies we are exposing. If
the connector uses flink-connector-base, then we probably need separate
artifacts with poms anyways.

Best,

Arvid

On Fri, Nov 19, 2021 at 10:55 AM Zhou, Brian  wrote:

> Hi Arvid,
>
> For branching model, the Pravega Flink connector has some experience what
> I would like to share. Here[1][2] is the compatibility matrix and wiki
> explaining the branching model and releases. Basically Pravega community
> aligns the connector releases with the Pravega mainline release, and
> maintains the connector with the latest 3 Flink versions(CI will publish
> snapshots for all these 3 branches).
> For example, recently we have 0.10.1 release[3], and in maven central we
> need to upload three artifacts(For Flink 1.13, 1.12, 1.11) for 0.10.1
> version[4].
>
> There are some alternatives. Another solution that we once discussed but
> finally got abandoned is to have a independent version just like the
> current CDC connector, and then give a big compatibility matrix to users.
> We think it would be too confusing when the connector develops. On the
> contrary, we can also do the opposite way to align with Flink version and
> maintain several branches for different system version.
>
> I would say this is only a fairly-OK solution because it is a bit painful
> for maintainers as cherry-picks are very common and releases would require
> much work. However, if neither systems do not have a nice backward
> compatibility, there seems to be no comfortable solution to the their
> connector.
>
> [1] https://github.com/pravega/flink-connectors#compatibility-matrix
> [2]
> https://github.com/pravega/flink-connectors/wiki/Versioning-strategy-for-Flink-connector
> [3] https://github.com/pravega/flink-connectors/releases/tag/v0.10.1
> [4] https://search.maven.org/search?q=pravega-connectors-flink
>
> Best Regards,
> Brian
>
>
> Internal Use - Confidential
>
> -Original Message-
> From: Arvid Heise 
> Sent: Friday, November 19, 2021 4:12 PM
> To: dev
> Subject: Re: [DISCUSS] Creating an external connector repository
>
>
> [EXTERNAL EMAIL]
>
> Hi everyone,
>
> we are currently in the process of setting up the flink-connectors repo
> [1] for new connectors but we hit a wall that we currently cannot take:
> branching model.
> To reiterate the original motivation of the external connector repo: We
> want to decouple the release cycle of a connector with Flink. However, if
> we want to support semantic versioning in the connectors with the ability
> to introduce breaking changes through major version bumps and support
> bugfixes on old versions, then we need release branches similar to how
> Flink core operates.
> Consider two connectors, let's call them kafka and hbase. We have kafka in
> version 1.0.X, 1.1.Y (small improvement), 2.0.Z (config option) change and
> hbase only on 1.0.A.
>
> Now our current assumption was that we can work with a mono-repo under ASF
> (flink-connectors). Then, for release-branches, we found 3 options:
> 1. We would need to create some ugly mess with the cross product of
> connector and version: so you have kafka-release-1.0, kafka-release-1.1,
> kafka-release-2.0, hbase-release-1.0. The main issue is not the amount of
> branches (that's something that git can handle) but there the state of
> kafka is undefined in hbase-release-1.0. That's a call for desaster and
> makes releasing connectors very cumbersome (CI would only execute and
> publish hbase SNAPSHOTS on hbase-release-1.0).
> 2. We could avoid the undefined state by having an empty master and each
> release branch really only holds the code of the connector. But that's also
> not great: any user that looks at the repo and sees no connector would
> assume that it's dead.
> 3. We could have synced releases similar to the CDC connectors [2]. That
> means that if any connector introduces a breaking change, all co

Re: [DISCUSS] Deprecate Java 8 support

2021-11-25 Thread Arvid Heise
+1 to deprecate Java 8, so we can hopefully incorporate the module concept
in Flink.

On Thu, Nov 25, 2021 at 9:49 AM Chesnay Schepler  wrote:

> Users can already use APIs from Java 8/11.
>
> On 25/11/2021 09:35, Francesco Guardiani wrote:
> > +1 with what both Ingo and Matthias sad, personally, I cannot wait to
> start using some of
> > the APIs introduced in Java 9. And I'm pretty sure that's the same for
> our users as well.
> >
> > On Tuesday, 23 November 2021 13:35:07 CET Ingo Bürk wrote:
> >> Hi everyone,
> >>
> >> continued support for Java 8 can also create project risks, e.g. if a
> >> vulnerability arises in Flink's dependencies and we cannot upgrade them
> >> because they no longer support Java 8. Some projects already started
> >> deprecating support as well, like Kafka, and other projects will likely
> >> follow.
> >> Let's also keep in mind that the proposal here is not to drop support
> right
> >> away, but to deprecate it, send the message, and motivate users to start
> >> migrating. Delaying this process could ironically mean users have less
> time
> >> to prepare for it.
> >>
> >>
> >> Ingo
> >>
> >> On Tue, Nov 23, 2021 at 8:54 AM Matthias Pohl 
> >>
> >> wrote:
> >>> Thanks for constantly driving these maintenance topics, Chesnay. +1
> from
> >>> my
> >>> side for deprecating Java 8. I see the point Jingsong is raising. But I
> >>> agree with what David already said here. Deprecating the Java version
> is a
> >>> tool to make users aware of it (same as starting this discussion
> thread).
> >>> If there's no major opposition against deprecating it in the community
> we
> >>> should move forward in this regard to make the users who do not
> >>> regularly browse the mailing list aware of it. That said, deprecating
> Java
> >>> 8 in 1.15 does not necessarily mean that it is dropped in 1.16.
> >>>
> >>> Best,
> >>> Matthias
> >>>
> >>> On Tue, Nov 23, 2021 at 8:46 AM David Morávek  wrote:
>  Thank you Chesnay for starting the discussion! This will generate bit
> of
> >>> a
> >>>
>  work for some users, but it's a good thing to keep moving the project
>  forward. Big +1 for this.
> 
>  Jingsong:
> 
>  Receiving this signal, the user may be unhappy because his application
> 
> > may be all on Java 8. Upgrading is a big job, after all, many systems
> > have not been upgraded yet. (Like you said, HBase and Hive)
>  The whole point of deprecation is to raise awareness, that this will
> be
>  happening eventually and users should take some steps to address this
> in
>  medium-term. If I understand Chesnay correctly, we'd still keep Java 8
>  around for quite some time to give users enough time to upgrade, but
>  without raising awareness we'd fight the very same argument later in
> >>> time.
> >>>
>  All of the prerequisites from 3rd party projects for both HBase [1]
> and
>  Hive [2] to fully support Java 11 have been completed, so the ball is
> on
>  their side and there doesn't seem to be much activity. Generating bit
> >>> more
> >>>
>  pressure on these efforts might be a good thing.
> 
>  It would be great to identify some of these users and learn bit more
> >>> about
> >>>
>  their situation. Are they keeping up with latest Flink developments or
> >>> are
> >>>
>  they lagging behind (this would also give them way more time for
>  eventual
>  upgrade)?
> 
>  [1] https://issues.apache.org/jira/browse/HBASE-22972
>  [2] https://issues.apache.org/jira/browse/HIVE-22415
> 
>  Best,
>  D.
> 
>  On Tue, Nov 23, 2021 at 3:08 AM Jingsong Li 
> 
>  wrote:
> > Hi Chesnay,
> >
> > Thanks for bringing this for discussion.
> >
> > We should dig deeper into the current Java version of Flink users. At
> > least make sure Java 8 is not a mainstream version.
> >
> > Receiving this signal, the user may be unhappy because his
> application
> > may be all on Java 8. Upgrading is a big job, after all, many systems
> > have not been upgraded yet. (Like you said, HBase and Hive)
> >
> > In my opinion, it is too early to deprecate support for Java 8. We
> > should wait for a safer point in time.
> >
> > On Mon, Nov 22, 2021 at 11:45 PM Ingo Bürk 
> wrote:
> >> Hi,
> >>
> >> also a +1 from me because of everything Chesnay already said.
> >>
> >>
> >> Ingo
> >>
> >> On Mon, Nov 22, 2021 at 4:41 PM Martijn Visser <
> >>> mart...@ververica.com>
>
>
>


Re: [DISCUSS] Creating an external connector repository

2021-11-19 Thread Arvid Heise
Hi everyone,

we are currently in the process of setting up the flink-connectors repo [1]
for new connectors but we hit a wall that we currently cannot take:
branching model.
To reiterate the original motivation of the external connector repo: We
want to decouple the release cycle of a connector with Flink. However, if
we want to support semantic versioning in the connectors with the ability
to introduce breaking changes through major version bumps and support
bugfixes on old versions, then we need release branches similar to how
Flink core operates.
Consider two connectors, let's call them kafka and hbase. We have kafka in
version 1.0.X, 1.1.Y (small improvement), 2.0.Z (config option) change and
hbase only on 1.0.A.

Now our current assumption was that we can work with a mono-repo under ASF
(flink-connectors). Then, for release-branches, we found 3 options:
1. We would need to create some ugly mess with the cross product of
connector and version: so you have kafka-release-1.0, kafka-release-1.1,
kafka-release-2.0, hbase-release-1.0. The main issue is not the amount of
branches (that's something that git can handle) but there the state of
kafka is undefined in hbase-release-1.0. That's a call for desaster and
makes releasing connectors very cumbersome (CI would only execute and
publish hbase SNAPSHOTS on hbase-release-1.0).
2. We could avoid the undefined state by having an empty master and each
release branch really only holds the code of the connector. But that's also
not great: any user that looks at the repo and sees no connector would
assume that it's dead.
3. We could have synced releases similar to the CDC connectors [2]. That
means that if any connector introduces a breaking change, all connectors
get a new major. I find that quite confusing to a user if hbase gets a new
release without any change because kafka introduced a breaking change.

To fully decouple release cycles and CI of connectors, we could add
individual repositories under ASF (flink-connector-kafka,
flink-connector-hbase). Then we can apply the same branching model as
before. I quickly checked if there are precedences in the apache community
for that approach and just by scanning alphabetically I found cordova with
70 and couchdb with 77 apache repos respectively. So it certainly seems
like other projects approached our problem in that way and the apache
organization is okay with that. I currently expect max 20 additional repos
for connectors and in the future 10 max each for formats and filesystems if
we would also move them out at some point in time. So we would be at a
total of 50 repos.

Note for all options, we need to provide a compability matrix that we aim
to autogenerate.

Now for the potential downsides that we internally discussed:
- How can we ensure common infra structure code, utilties, and quality?
I propose to add a flink-connector-common that contains all these things
and is added as a git submodule/subtree to the repos.
- Do we implicitly discourage connector developers to maintain more than
one connector with a fragmented code base?
That is certainly a risk. However, I currently also see few devs working on
more than one connector. However, it may actually help keeping the devs
that maintain a specific connector on the hook. We could use github issues
to track bugs and feature requests and a dev can focus his limited time on
getting that one connector right.

So WDYT? Compared to some intermediate suggestions with split repos, the
big difference is that everything remains under Apache umbrella and the
Flink community.

[1] https://github.com/apache/flink-connectors
[2] https://github.com/ververica/flink-cdc-connectors/

On Fri, Nov 12, 2021 at 3:39 PM Arvid Heise  wrote:

> Hi everyone,
>
> I created the flink-connectors repo [1] to advance the topic. We would
> create a proof-of-concept in the next few weeks as a special branch that
> I'd then use for discussions. If the community agrees with the approach,
> that special branch will become the master. If not, we can reiterate over
> it or create competing POCs.
>
> If someone wants to try things out in parallel, just make sure that you
> are not accidentally pushing POCs to the master.
>
> As a reminder: We will not move out any current connector from Flink at
> this point in time, so everything in Flink will remain as is and be
> maintained there.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink-connectors
>
> On Fri, Oct 29, 2021 at 6:57 PM Till Rohrmann 
> wrote:
>
>> Hi everyone,
>>
>> From the discussion, it seems to me that we have different opinions
>> whether to have an ASF umbrella repository or to host them outside of the
>> ASF. It also seems that this is not really the problem to solve. Since
>> there are many good arguments for either approach, we could simply start
>> with an ASF umbrella repository and see how people adopt

[ANNOUNCE] New Apache Flink Committer - Fabian Paul

2021-11-15 Thread Arvid Heise
Hi everyone,

On behalf of the PMC, I'm very happy to announce Fabian Paul as a new Flink
committer.

Fabian Paul has been actively improving the connector ecosystem by
migrating Kafka and ElasticSearch to the Sink interface and is currently
driving FLIP-191 [1] to tackle the sink compaction issue. While he is
active on the project (authored 70 PRs and reviewed 60), it's also worth
highlighting that he has also been guiding external efforts, such as the
DeltaLake Flink connector or the Pinot sink in Bahir.

Please join me in congratulating Fabian for becoming a Flink committer!

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction


Re: [DISCUSS] Conventions on assertions to use in tests

2021-11-12 Thread Arvid Heise
JUnit5 migration is currently mostly prepared. The rules are being migrated
[1] and Hang and Qingsheng have migrated most tests in their branch afaik
(Kudos to them!).

Using assertj would make migration easier as it's independent of the JUnit
version. But the same can be said about hamcrest, albeit less expressive.

I'm personally in favor of assertj (disclaimer I contributed to the project
a bit). But I'm not sure if it's wise to change everything at once also
from the perspective of less active contributors. We may alleviate that
pain by providing good guides though. So maybe, we should also include a
temporal dimension into the discussion.

[1] https://github.com/apache/flink/pull/17556

On Fri, Nov 12, 2021 at 3:58 PM Till Rohrmann  wrote:

> Thanks for starting this discussion Francesco. I think there is a lot of
> value in consistency because it makes it a lot easier to navigate and
> contribute to the code base. The testing tools are definitely one important
> aspect of consistency.
>
> It is a bit unfortunate that we have tests that follow different patterns.
> This, however, is mainly due to organic growth. I think the community
> started with Junit4, then we chose to use Hamcrest because of its better
> expressiveness. Most recently, there was an effort started that aimed at
> switching over to Junit5 [1, 2]. @Arvid Heise  knows
> more about the current status.
>
> Personally, I don't have a strong preference for which testing tools to
> use. The important bit is that we agree as a community, then document the
> choice and finally stick to it. So before starting to use assertj, we
> should probably align with the folks working on the Junit5 effort first.
>
> [1] https://lists.apache.org/thread/jsjvc2cqb91pyh47d4p6olk3c1vxqm3w
> [2] https://lists.apache.org/thread/d9y5tzcl8wpk6ozmf8575qfzww450jpk
>
> Cheers,
> Till
>
> On Fri, Nov 12, 2021 at 3:41 PM David Anderson 
> wrote:
>
>> For what it's worth, I recently rewrote all of the tests in flink-training
>> to use assertj, removing a mixture of junit4 assertions and hamcrest in
>> the
>> process. I chose assertj because I found it to be more expressive and made
>> the tests more readable.
>>
>> +1 from me
>>
>> David
>>
>> On Fri, Nov 12, 2021 at 10:03 AM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>> > Hi all,
>> >
>> > I wonder If we have a convention of the testing tools (in particular
>> > assertions) to use in our tests. If not, are modules free to decide on a
>> > convention on their own?
>> >
>> > In case of table, we have a mixed bag of different assertions of all
>> kinds,
>> > sometimes mixed even in the same test:
>> >
>> >- Assertions from junit 4
>> >- Assertions from junit 5
>> >- Hamcrest
>> >- Some custom assertions classes (e.g. RowDataHarnessAssertor)
>> >- assert instructions
>> >
>> > The result is that most tests are very complicated to read and
>> understand,
>> > and we have a lot of copy pasted "assertion methods" all around our
>> > codebase.
>> >
>> > For table in particular, I propose to introduce assertj [1] and develop
>> a
>> > couple of custom assertions [2] for the types we use most in our tests,
>> > e.g. Row, RowData, DataType, LogicalType, etc... For example:
>> >
>> > assertFalse(row.isNullAt(1));
>> > assert row instanceof GenericRowData;
>> > assertEquals(row.getField(1),
>> TimestampData.ofEpochMillis(expectedMillis));
>> >
>> > Could be:
>> >
>> > assertThat(row)
>> >   .getField(1, TimestampData.class)
>> >   .isEqualToEpochMillis(expectedMillis)
>> >
>> > We could have these in table-common so every part of the table stack can
>> > benefit from it. Of course we can't take all our tests and convert them
>> to
>> > the new assertions, but as a policy we can enforce to use the new
>> > assertions convention for every new test or for every test we modify in
>> > future PRs.
>> >
>> > What's your opinion about it? Do you agree to have such kind of policy
>> of
>> > using the same assertions? If yes, do you like the idea of using
>> assertj to
>> > implement such policy?
>> >
>> > FG
>> >
>> > [1] A library for assertions https://assertj.github.io, already used by
>> > the
>> > pulsar connector
>> > [2]
>> https://assertj.github.io/doc/#assertj-core-custom-assertions-creation
>> > --
>> >
>> > Francesco Guardiani | Software Engineer
>> >
>> > france...@ververica.com
>> >
>> >
>> > <https://www.ververica.com/>
>> >
>> > Follow us @VervericaData
>> >
>> > --
>> >
>> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> > Conference
>> >
>> > Stream Processing | Event Driven | Real Time
>> >
>> > --
>> >
>> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >
>> > --
>> >
>> > Ververica GmbH
>> >
>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >
>> > Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung
>> Jason,
>> > Jinwei (Kevin) Zhang
>> >
>>
>


Re: [DISCUSS] Creating an external connector repository

2021-11-12 Thread Arvid Heise
Hi everyone,

I created the flink-connectors repo [1] to advance the topic. We would
create a proof-of-concept in the next few weeks as a special branch that
I'd then use for discussions. If the community agrees with the approach,
that special branch will become the master. If not, we can reiterate over
it or create competing POCs.

If someone wants to try things out in parallel, just make sure that you are
not accidentally pushing POCs to the master.

As a reminder: We will not move out any current connector from Flink at
this point in time, so everything in Flink will remain as is and be
maintained there.

Best,

Arvid

[1] https://github.com/apache/flink-connectors

On Fri, Oct 29, 2021 at 6:57 PM Till Rohrmann  wrote:

> Hi everyone,
>
> From the discussion, it seems to me that we have different opinions
> whether to have an ASF umbrella repository or to host them outside of the
> ASF. It also seems that this is not really the problem to solve. Since
> there are many good arguments for either approach, we could simply start
> with an ASF umbrella repository and see how people adopt it. If the
> individual connectors cannot move fast enough or if people prefer to not
> buy into the more heavy-weight ASF processes, then they can host the code
> also somewhere else. We simply need to make sure that these connectors are
> discoverable (e.g. via flink-packages).
>
> The more important problem seems to be to provide common tooling (testing,
> infrastructure, documentation) that can easily be reused. Similarly, it has
> become clear that the Flink community needs to improve on providing stable
> APIs. I think it is not realistic to first complete these tasks before
> starting to move connectors to dedicated repositories. As Stephan said,
> creating a connector repository will force us to pay more attention to API
> stability and also to think about which testing tools are required. Hence,
> I believe that starting to add connectors to a different repository than
> apache/flink will help improve our connector tooling (declaring testing
> classes as public, creating a common test utility repo, creating a repo
> template) and vice versa. Hence, I like Arvid's proposed process as it will
> start kicking things off w/o letting this effort fizzle out.
>
> Cheers,
> Till
>
> On Thu, Oct 28, 2021 at 11:44 AM Stephan Ewen  wrote:
>
> > Thank you all, for the nice discussion!
> >
> > From my point of view, I very much like the idea of putting connectors
> in a
> > separate repository. But I would argue it should be part of Apache Flink,
> > similar to flink-statefun, flink-ml, etc.
> >
> > I share many of the reasons for that:
> >   - As argued many times, reduces complexity of the Flink repo, increases
> > response times of CI, etc.
> >   - Much lower barrier of contribution, because an unstable connector
> would
> > not de-stabilize the whole build. Of course, we would need to make sure
> we
> > set this up the right way, with connectors having individual CI runs,
> build
> > status, etc. But it certainly seems possible.
> >
> >
> > I would argue some points a bit different than some cases made before:
> >
> > (a) I believe the separation would increase connector stability. Because
> it
> > really forces us to work with the connectors against the APIs like any
> > external developer. A mono repo is somehow the wrong thing if you in
> > practice want to actually guarantee stable internal APIs at some layer.
> > Because the mono repo makes it easy to just change something on both
> sides
> > of the API (provider and consumer) seamlessly.
> >
> > Major refactorings in Flink need to keep all connector API contracts
> > intact, or we need to have a new version of the connector API.
> >
> > (b) We may even be able to go towards more lightweight and automated
> > releases over time, even if we stay in Apache Flink with that repo.
> > This isn't yet fully aligned with the Apache release policies, yet, but
> > there are board discussions about whether there can be bot-triggered
> > releases (by dependabot) and how that could fit into the Apache process.
> >
> > This doesn't seem to be quite there just yet, but seeing that those start
> > is a good sign, and there is a good chance we can do some things there.
> > I am not sure whether we should let bots trigger releases, because a
> final
> > human look at things isn't a bad thing, especially given the popularity
> of
> > software supply chain attacks recently.
> >
> >
> > I do share Chesnay's concerns about complexity in tooling, though. Both
> > release tooling and test tooling. They are not incompatible wi

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Arvid Heise
>
> Emitting records for downstream operators in or after
> notifyCheckpointComplete no longer works after FLIP-147 when executing the
> final checkpoint. The problem is that the final checkpoint happens after
> the EOI and we would like to keep the property that you can terminate the
> whole topology with a single checkpoint, if possible.
>

Sorry for the confusion, I meant to say emit them after the barrier (e.g.
in snapshotState).

On Thu, Nov 4, 2021 at 9:49 AM Till Rohrmann  wrote:

> Thanks for the detailed description Arvid.
>
> I might misunderstand things but one comment concerning:
>
> | We could even optimize the writer to only emit the committables after
> notifyCheckpointComplete as long as we retain them in the state of the
> writer.
>
> Emitting records for downstream operators in or after
> notifyCheckpointComplete no longer works after FLIP-147 when executing the
> final checkpoint. The problem is that the final checkpoint happens after
> the EOI and we would like to keep the property that you can terminate the
> whole topology with a single checkpoint, if possible.
>
> Cheers,
> Till
>
> On Thu, Nov 4, 2021 at 9:05 AM Arvid Heise  wrote:
>
>> Hi folks,
>>
>> thanks for the lively discussion. Let me present my point of view on a
>> couple of items:
>>
>> *Impact on checkpointing times*
>>
>> Currently, we send the committables of the writer downstream before the
>> barrier is sent. That allows us to include all committables in the state of
>> the committers, such that the committer receives all committables before it
>> snapshots.
>>
>> All proposals now add a processing step on the committables where certain
>> committables are virtually merged into larger committables. However, none
>> of the approaches require the physical merge to happen before the barrier
>> is sent downstream. In fact, we can even delay the virtual merge after the
>> checkpoint has been fully taken. We could even optimize the writer to only
>> emit the committables after notifyCheckpointComplete as long as we
>> retain them in the state of the writer. The important point is that the
>> committables become part of the checkpoint, but they are always committed
>> only after notifyCheckpointComplete. Especially the retry mechanism of
>> 1.14 decouples the exact time of the commit from notifyCheckpointComplete.
>> The retry happens asynchronously, so there is no reason to believe that we
>> can't do the merging asynchronously with any option.
>>
>> Naturally, all approaches delay the checkpoint barrier a bit by either
>> adding RPC calls or shuffles but the impact is rather minimal in a well
>> configured system (the number of committables is assumed to be tiny), so
>> I'm assuming a tad higher checkpointing time because the topology is more
>> complex (in all cases).
>>
>> *Impact on latency*
>>
>> All approaches will also delay the effective commit, since additionaly
>> work needs to be done but I'd argue that this is by design and should be
>> clear to everyone. Especially, when merging files across checkpoints,
>> certain results will not be visible until much later. Once, we settle for
>> an approach, we should think which options we give to sink developers and
>> end-users to impact that latency.
>>
>> An important aspect here is that we also refine the contract on
>> GlobalCommitter. Currently, it's not clear when it is supposed to be
>> called; for example, to register files in a metastore. Naively, I would
>> have said that the GlobalCommitter is invoked when all committables of a
>> certain checkpoint have been committed.
>> a) But what happens in the case of a failure and retry? Do we delay until
>> the commit finally happens?
>> b) What do we do with committables that are held back for compaction? Do
>> we global commit when all committables of checkpoint A are committed
>> ignoring small files? Or do we wait until a later checkpoint, when all
>> small files of A have been merged such that indeed all data of A has been
>> committed.
>>
>> *Re: Developers understand that the cost is relatively high.*
>>
>> Yes that is a valid concern that we already have with committer and
>> global committer (which none of the new users understand). I don't like
>> that we have so many Optional methods where it's not clear which methods to
>> implement to achieve certain functionality. Ideally, we split up the Sink
>> in many smaller components where you add certain traits. For example,
>> HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
>> HasGlobalCommitter (ther

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Arvid Heise
Hi folks,

thanks for the lively discussion. Let me present my point of view on a
couple of items:

*Impact on checkpointing times*

Currently, we send the committables of the writer downstream before the
barrier is sent. That allows us to include all committables in the state of
the committers, such that the committer receives all committables before it
snapshots.

All proposals now add a processing step on the committables where certain
committables are virtually merged into larger committables. However, none
of the approaches require the physical merge to happen before the barrier
is sent downstream. In fact, we can even delay the virtual merge after the
checkpoint has been fully taken. We could even optimize the writer to only
emit the committables after notifyCheckpointComplete as long as we retain
them in the state of the writer. The important point is that the
committables become part of the checkpoint, but they are always committed
only after notifyCheckpointComplete. Especially the retry mechanism of 1.14
decouples the exact time of the commit from notifyCheckpointComplete. The
retry happens asynchronously, so there is no reason to believe that we
can't do the merging asynchronously with any option.

Naturally, all approaches delay the checkpoint barrier a bit by either
adding RPC calls or shuffles but the impact is rather minimal in a well
configured system (the number of committables is assumed to be tiny), so
I'm assuming a tad higher checkpointing time because the topology is more
complex (in all cases).

*Impact on latency*

All approaches will also delay the effective commit, since additionaly work
needs to be done but I'd argue that this is by design and should be clear
to everyone. Especially, when merging files across checkpoints, certain
results will not be visible until much later. Once, we settle for an
approach, we should think which options we give to sink developers and
end-users to impact that latency.

An important aspect here is that we also refine the contract on
GlobalCommitter. Currently, it's not clear when it is supposed to be
called; for example, to register files in a metastore. Naively, I would
have said that the GlobalCommitter is invoked when all committables of a
certain checkpoint have been committed.
a) But what happens in the case of a failure and retry? Do we delay until
the commit finally happens?
b) What do we do with committables that are held back for compaction? Do we
global commit when all committables of checkpoint A are committed ignoring
small files? Or do we wait until a later checkpoint, when all small files
of A have been merged such that indeed all data of A has been committed.

*Re: Developers understand that the cost is relatively high.*

Yes that is a valid concern that we already have with committer and global
committer (which none of the new users understand). I don't like that we
have so many Optional methods where it's not clear which methods to
implement to achieve certain functionality. Ideally, we split up the Sink
in many smaller components where you add certain traits. For example,
HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
HasGlobalCommitter (there are probably better names for the traits)
We still can change that as everything is experimental (and annoy connector
devs) but it seems to be an orthogonal discussion.

If we compare Option 2 with Option 3, I'd argue that the mental model of
the connector dev is more stressed with option 3 than with 2. He needs to
map the high-level concepts of the current Sink to the low-level concepts
of DataStream. He needs to understand the data that is being sent between
writer and committer to be able to hook in.
Note that we need to move away from just sending CommT and wrap it with
some metadata, such as checkpoint id and subtask id.

The dev also needs to care about managing the state, which we may abstract
in Option 2 (not entirely sure). And the dev needs to understand the task
life-cycle to emit the remaining committables before the job shuts down (is
that even possible on DataStream level? Are we notified on EOI or do we
expect devs to use operator API?). Lastly, if things are done
asynchronously as you have championed, the user also needs to take care of
ensuring that all async tasks are done before shutdown.

*Re 2. Repeated implementation*

The newly introduced `aggregate` can set the parallelism, thus perhaps
> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
> also need to be supported? If they are supported, additional workloads are
> required, and at the same time I feel that these workloads are unnecessary;
> if not, it will also increase the developers’ understanding cost: for
> example, why these operators can not set these attributes?
>

I'm assuming that you mean that we are about to replicate API between
DataStream and Sink compactor? I wouldn't do that. I would actually also
fix the parallelism to the writer's parallelism. So we just have 

Re: [NOTICE] flink-streaming-java no longer depends on Scala and lost it's suffix

2021-10-26 Thread Arvid Heise
Awesome. Thank you very much for all the hard work!

On Tue, Oct 26, 2021 at 1:06 AM Chesnay Schepler  wrote:

> This time with proper formatting...
>
> flink-batch-sql-test
> flink-cep
> flink-cli-test
> flink-clients
> flink-connector-elasticsearch-base
> flink-connector-elasticsearch5
> flink-connector-elasticsearch6
> flink-connector-elasticsearch7
> flink-connector-gcp-pubsub
> flink-connector-hbase-1.4
> flink-connector-hbase-2.2
> flink-connector-hbase-base
> flink-connector-jdbc
> flink-connector-kafka
> flink-connector-kinesis
> flink-connector-nifi
> flink-connector-pulsar
> flink-connector-rabbitmq
> flink-connector-testing
> flink-connector-twitter
> flink-connector-wikiedits
> flink-container
> flink-distributed-cache-via-blob-test
> flink-dstl-dfs
> flink-gelly
> flink-hadoop-bulk
> flink-kubernetes
> flink-parent-child-classloading-test-lib-package
> flink-parent-child-classloading-test-program
> flink-queryable-state-test
> flink-runtime-web
> flink-scala
> flink-sql-connector-elasticsearch6
> flink-sql-connector-elasticsearch7
> flink-sql-connector-hbase-1.4
> flink-sql-connector-hbase-2.2
> flink-sql-connector-kafka
> flink-sql-connector-kinesis
> flink-sql-connector-rabbitmq
> flink-state-processor-api
> flink-statebackend-rocksdb
> flink-streaming-java
> flink-streaming-kafka-test
> flink-streaming-kafka-test-base
> flink-streaming-kinesis-test
> flink-table-api-java-bridge
> flink-test-utils
> flink-walkthrough-common
> flink-yarn
>
>
> On 26/10/2021 01:04, Chesnay Schepler wrote:
> > Hello all,
> >
> > I just wanted to inform everyone that I just merged
> > https://issues.apache.org/jira/browse/FLINK-24018, removing the
> > transitive Scala dependencies from flink-streaming-java. This also
> > means that the module lost it's Scala suffix, along with a lot of
> > other modules.
> >
> > Please keep this mind this for a few days when adding Flink
> > dependencies or new modules; it is quite likely that something has
> > changed w.r.t. the Scala suffixes.
> >
> > For completeness sake, these are the module that lost the suffix:
> >
> > |flink-batch-sql-test flink-cep flink-cli-test flink-clients
> > flink-connector-elasticsearch-base flink-connector-elasticsearch5
> > flink-connector-elasticsearch6 flink-connector-elasticsearch7
> > flink-connector-gcp-pubsub flink-connector-hbase-1.4
> > flink-connector-hbase-2.2 flink-connector-hbase-base
> > flink-connector-jdbc flink-connector-kafka flink-connector-kinesis
> > flink-connector-nifi flink-connector-pulsar flink-connector-rabbitmq
> > flink-connector-testing flink-connector-twitter
> > flink-connector-wikiedits flink-container
> > flink-distributed-cache-via-blob-test flink-dstl-dfs flink-gelly
> > flink-hadoop-bulk flink-kubernetes
> > flink-parent-child-classloading-test-lib-package
> > flink-parent-child-classloading-test-program
> > flink-queryable-state-test flink-runtime-web flink-scala
> > flink-sql-connector-elasticsearch6 flink-sql-connector-elasticsearch7
> > flink-sql-connector-hbase-1.4 flink-sql-connector-hbase-2.2
> > flink-sql-connector-kafka flink-sql-connector-kinesis
> > flink-sql-connector-rabbitmq flink-state-processor-api
> > flink-statebackend-rocksdb flink-streaming-java
> > flink-streaming-kafka-test flink-streaming-kafka-test-base
> > flink-streaming-kinesis-test flink-table-api-java-bridge
> > flink-test-utils flink-walkthrough-common flink-yarn|
> >
>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-26 Thread Arvid Heise
ill
> > > >> @PublicEvolving and new Sink API is still @Experimental.
> > > >>
> > > >>
> > > >> (2) Flink testability without connectors.
> > > >> > Flink w/o Kafka connector (and few others) isn't
> > > >> > viable. Testability of Flink was already brought up, can we really
> > > >> > certify a Flink core release without Kafka connector? Maybe those
> > > >> > connectors that are used in Flink e2e tests to validate
> > functionality
> > > >> > of core Flink should not be broken out?
> > > >>
> > > >> This is a very good question. How can we guarantee the new Source
> and
> > Sink
> > > >> API are stable with only test implementation?
> > > >>
> > > >>
> > > >> Best,
> > > >> Jark
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler 
> > > >> wrote:
> > > >>
> > > >> > Could you clarify what release cadence you're thinking of? There's
> > quite
> > > >> > a big range that fits "more frequent than Flink" (per-commit,
> daily,
> > > >> > weekly, bi-weekly, monthly, even bi-monthly).
> > > >> >
> > > >> > On 19/10/2021 14:15, Martijn Visser wrote:
> > > >> > > Hi all,
> > > >> > >
> > > >> > > I think it would be a huge benefit if we can achieve more
> frequent
> > > >> > releases
> > > >> > > of connectors, which are not bound to the release cycle of Flink
> > > >> itself.
> > > >> > I
> > > >> > > agree that in order to get there, we need to have stable
> > interfaces
> > > >> which
> > > >> > > are trustworthy and reliable, so they can be safely used by
> those
> > > >> > > connectors. I do think that work still needs to be done on those
> > > >> > > interfaces, but I am confident that we can get there from a
> Flink
> > > >> > > perspective.
> > > >> > >
> > > >> > > I am worried that we would not be able to achieve those frequent
> > > >> releases
> > > >> > > of connectors if we are putting these connectors under the
> Apache
> > > >> > umbrella,
> > > >> > > because that means that for each connector release we have to
> > follow
> > > >> the
> > > >> > > Apache release creation process. This requires a lot of manual
> > steps
> > > >> and
> > > >> > > prohibits automation and I think it would be hard to scale out
> > > >> frequent
> > > >> > > releases of connectors. I'm curious how others think this
> > challenge
> > > >> could
> > > >> > > be solved.
> > > >> > >
> > > >> > > Best regards,
> > > >> > >
> > > >> > > Martijn
> > > >> > >
> > > >> > > On Mon, 18 Oct 2021 at 22:22, Thomas Weise 
> > wrote:
> > > >> > >
> > > >> > >> Thanks for initiating this discussion.
> > > >> > >>
> > > >> > >> There are definitely a few things that are not optimal with our
> > > >> > >> current management of connectors. I would not necessarily
> > > >> characterize
> > > >> > >> it as a "mess" though. As the points raised so far show, it
> isn't
> > > >> easy
> > > >> > >> to find a solution that balances competing requirements and
> > leads to
> > > >> a
> > > >> > >> net improvement.
> > > >> > >>
> > > >> > >> It would be great if we can find a setup that allows for
> > connectors
> > > >> to
> > > >> > >> be released independently of core Flink and that each connector
> > can
> > > >> be
> > > >> > >> released separately. Flink already has separate releases
> > > >> > >> (flink-shaded), so that by itself isn't a new thing.
> > Pe

Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Arvid Heise
Okay I think it is clear that the majority would like to keep connectors
under the Apache Flink umbrella. That means we will not be able to have
per-connector repositories and project management, automatic dependency
bumping with Dependabot, or semi-automatic releases.

So then I'm assuming the directory structure that @Chesnay Schepler
 proposed would be the most beneficial:
- A root project with some convenience setup.
- Unrelated subprojects with individual versioning and releases.
- Branches for minor Flink releases. That is needed anyhow to use new
features independent of API stability.
- Each connector maintains its own documentation that is accessible through
the main documentation.

Any thoughts on alternatives? Do you see risks?

@Stephan Ewen  mentioned offline that we could adjust the
bylaws for the connectors such that we need fewer PMCs to approve a
release. Would it be enough to have one PMC vote per connector release? Do
you know of other ways to tweak the release process to have fewer manual
work?

On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:

> Thanks for initiating this discussion.
>
> There are definitely a few things that are not optimal with our
> current management of connectors. I would not necessarily characterize
> it as a "mess" though. As the points raised so far show, it isn't easy
> to find a solution that balances competing requirements and leads to a
> net improvement.
>
> It would be great if we can find a setup that allows for connectors to
> be released independently of core Flink and that each connector can be
> released separately. Flink already has separate releases
> (flink-shaded), so that by itself isn't a new thing. Per-connector
> releases would need to allow for more frequent releases (without the
> baggage that a full Flink release comes with).
>
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.
>
> Regarding some connectors being more important for Flink than others:
> That's a fact. Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?
>
> Finally, I think that the connectors that move into separate repos
> should remain part of the Apache Flink project. Larger organizations
> tend to approve the use of and contribution to open source at the
> project level. Sometimes it is everything ASF. More often it is
> "Apache Foo". It would be fatal to end up with a patchwork of projects
> with potentially different licenses and governance to arrive at a
> working Flink setup. This may mean we prioritize usability over
> developer convenience, if that's in the best interest of Flink as a
> whole.
>
> Thanks,
> Thomas
>
>
>
> On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
> wrote:
> >
> > Generally, the issues are reproducibility and control.
> >
> > Stuffs completely broken on the Flink side for a week? Well then so are
> > the connector repos.
> > (As-is) You can't go back to a previous version of the snapshot. Which
> > also means that checking out older commits can be problematic because
> > you'd still work against the latest snapshots, and they not be
> > compatible with each other.
> >
> >
> > On 18/10/2021 15:22, Arvid Heise wrote:
> > > I was actually betting on snapshots versions. What are the limits?
> > > Obviously, we can only do a release of a 1.15 connector after 1.15 is
> > > release.
> >
> >
>


Re: [VOTE] Release 1.13.3, release candidate #1

2021-10-19 Thread Arvid Heise
+1 (binding)

- build from source on scala 2_12 profile
- ran standalone cluster with examples

Best,

Arvid

On Tue, Oct 19, 2021 at 4:48 AM Dian Fu  wrote:

> +1 (binding)
>
> - verified the checksum and signature
> - checked the dependency changes since 1.13.2. There is only one dependency
> change (commons-compress: 1.20 -> 1.21) and it is well documented in the
> NOTICE file
> - installed the PyFlink packages in MacOS and runs a few examples
> successfully
> - the website PR LGTM
>
> Regards,
> Dian
>
> On Mon, Oct 18, 2021 at 8:07 PM Leonard Xu  wrote:
>
> >
> > +1 (non-binding)
> >
> > - verified signatures and hashsums
> > - built from source code with scala 2.11 succeeded
> > - started a cluster, ran a wordcount job, the result is expected, no
> > suspicious log output
> > - started SQL Client, used mysql-cdc connector to consumer changelog from
> > MySQL, the result is expected
> > - reviewed the web PR
> >
> > Best,
> > Leonard
> >
> > > 在 2021年10月18日,16:20,JING ZHANG  写道:
> > >
> > > Thanks Chesnay for driving this.
> > >
> > > +1 (non-binding)
> > >
> > > - built from source code flink-1.13.3-src.tgz
> > > <
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.3-rc1/flink-1.13.3-src.tgz
> > >
> > > succeeded
> > > - started a standalone Flink cluster, ran the WordCount example, WebUI
> > > looks good,  no suspicious output/log.
> > > - started cluster and run some e2e sql queries using SQL Client, query
> > > result is expected
> > > - repeat step 2 and step 3 with flink-1.13.3-bin-scala_2.11.tgz
> > > <
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.3-rc1/flink-1.13.3-bin-scala_2.11.tgz
> > >
> > > - repeat step 2 and step 3 with flink-1.13.3-bin-scala_2.12.tgz
> > > <
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.3-rc1/flink-1.13.3-bin-scala_2.12.tgz
> > >
> > >
> > > Best,
> > > JING ZHANG
> > >
> > > Matthias Pohl  于2021年10月15日周五 下午10:07写道:
> > >
> > >> Thanks Chesnay for driving this.
> > >>
> > >> +1 (non-binding)
> > >>
> > >> - verified the checksums
> > >> - build 1.13.3-rc1 from sources
> > >> - went over the pom file diff to see whether we missed newly added
> > >> dependency in the NOTICE file
> > >> - went over the release blog post
> > >> - checked that scala 2.11 and 2.12 artifacts are present in the Maven
> > repo
> > >> - Run example jobs without noticing any issues in the logs
> > >> - Triggered e2e test run on VVP based on 1.13.3 RC1
> > >>
> > >> On Tue, Oct 12, 2021 at 7:22 PM Chesnay Schepler 
> > >> wrote:
> > >>
> > >>> Hi everyone,
> > >>> Please review and vote on the release candidate #1 for the version
> > >>> 1.13.3, as follows:
> > >>> [ ] +1, Approve the release
> > >>> [ ] -1, Do not approve the release (please provide specific comments)
> > >>>
> > >>>
> > >>> The complete staging area is available for your review, which
> includes:
> > >>> * JIRA release notes [1],
> > >>> * the official Apache source release and binary convenience releases
> to
> > >>> be deployed to dist.apache.org [2], which are signed with the key
> with
> > >>> fingerprint C2EED7B111D464BA [3],
> > >>> * all artifacts to be deployed to the Maven Central Repository [4],
> > >>> * source code tag "release-1.13.3-rc1" [5],
> > >>> * website pull request listing the new release and adding
> announcement
> > >>> blog post [6].
> > >>>
> > >>> The vote will be open for at least 72 hours. It is adopted by
> majority
> > >>> approval, with at least 3 PMC affirmative votes.
> > >>>
> > >>> Thanks,
> > >>> Release Manager
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350329
> > >>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.3-rc1/
> > >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > >>> [4]
> > >>
> https://repository.apache.org/content/repositories/orgapacheflink-1453
> > >>> [5] https://github.com/apache/flink/tree/release-1.13.3-rc1
> > >>> [6] https://github.com/apache/flink-web/pull/473
> > >>>
> > >>
> >
> >
>


Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Arvid Heise
rs to contribute, and release process should follow Apache
>> rules, which are against our initial motivations of externalizing
>> connectors.
>>
>> Using an individual Github organization will maximum the freedom provided
>> to developers. An ideal structure in my mind would be like "
>> github.com/flink-connectors/flink-connector-xxx". The new established
>> flink-extended org might be another choice, but considering the amount of
>> connectors, I prefer to use an individual org for connectors to avoid
>> flushing other repos under flink-extended.
>>
>> In the meantime, we need to provide a well-established standard /
>> guideline for contributing connectors, including CI, testing, docs (maybe
>> we can’t provide resources for running them, but we should give enough
>> guide on how to setup one) to keep the high quality of connectors. I’m
>> happy to help building these fundamental bricks. Also since Kafka connector
>> is widely used among Flink users, we can make Kafka connector a “model” of
>> how to build and contribute a well-qualified connector into Flink
>> ecosystem, and we can still use this trusted one for Flink E2E tests.
>>
>> Again I believe this will definitely boost the expansion of Flink
>> ecosystem. Very excited to see the progress!
>>
>> Best,
>>
>> Qingsheng Ren
>> On Oct 15, 2021, 8:47 PM +0800, Arvid Heise , wrote:
>> > Dear community,
>> > Today I would like to kickstart a series of discussions around creating
>> an external connector repository. The main idea is to decouple the release
>> cycle of Flink with the release cycles of the connectors. This is a common
>> approach in other big data analytics projects and seems to scale better
>> than the current approach. In particular, it will yield the following
>> changes.
>> >  • Faster releases of connectors: New features can be added more
>> quickly, bugs can be fixed immediately, and we can have faster security
>> patches in case of direct or indirect (through dependencies) security
>> flaws. • New features can be added to old Flink versions: If the connector
>> API didn’t change, the same connector jar may be used with different Flink
>> versions. Thus, new features can also immediately be used with older Flink
>> versions. A compatibility matrix on each connector page will help users to
>> find suitable connector versions for their Flink versions. • More activity
>> and contributions around connectors: If we ease the contribution and
>> development process around connectors, we will see faster development and
>> also more connectors. Since that heavily depends on the chosen approach
>> discussed below, more details will be shown there. • An overhaul of the
>> connector page: In the future, all known connectors will be shown on the
>> same page in a similar layout independent of where they reside. They could
>> be hosted on external project pages (e.g., Iceberg and Hudi), on some
>> company page, or may stay within the main Flink repository. Connectors
>> may receive some sort of quality seal such that users can quickly access
>> the production-readiness and we could also add which community/company
>> promises which kind of support. • If we take out (some) connectors out of
>> Flink, Flink CI will be faster and Flink devs will experience less build
>> stabilities (which mostly come from connectors). That would also speed up
>> Flink development.
>> > Now I’d first like to collect your viewpoints on the ideal state. Let’s
>> first recap which approaches, we currently have:
>> >  • We have half of the connectors in the main Flink repository.
>> Relatively few of them have received updates in the past couple of
>> months. • Another large chunk of connectors are in Apache Bahir. It
>> recently has seen the first release in 3 years. • There are a few other
>> (Apache) projects that maintain a Flink connector, such as Apache Iceberg,
>> Apache Hudi, and Pravega. • A few connectors are listed on company-related
>> repositories, such as Apache Pulsar on StreamNative and CDC connectors on
>> Ververica.
>> > My personal observation is that having a repository per connector seems
>> to increase the activity on a connector as it’s easier to maintain. For
>> example, in Apache Bahir all connectors are built against the same Flink
>> version, which may not be desirable when certain APIs change; for example,
>> SinkFunction will be eventually deprecated and removed but new Sink
>> interface may gain more features.
>> > Now, I'd like to outline different approaches. All approaches will

[jira] [Created] (FLINK-24574) Allow metrics to be removed in UDFs

2021-10-18 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24574:
---

 Summary: Allow metrics to be removed in UDFs
 Key: FLINK-24574
 URL: https://issues.apache.org/jira/browse/FLINK-24574
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.15.0
Reporter: Arvid Heise


A user asked 
{noformat}
Suppose I have a short lived process within a UDF that defines metrics. After 
the process has completed, the underlying resources should be cleaned up. Is 
there an API to remove/unregister metrics?{noformat}
There are probably more related use cases, where a metric is only temporarily 
meaningful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: dataStream can not use multiple classloaders

2021-10-18 Thread Arvid Heise
You also must ensure that your SourceFunction is serializable, so it's not
enough to just refer to some classloader, you must ensure that you have
access to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng  wrote:

> Hi!
>
> There is only one classloader for user code by default in runtime. The
> main method of your code is only executed on the client side. It generates
> a job graph and sends it to the cluster.
>
> To avoid class loading conflict it is recommended to shade the
> dependencies of your source and sink function jars. If you really have to
> load some dependencies with different class loaders, you can load them in
> the open method of a RichSourceFunction or RichSinkFunction.
>
> 百岁  于2021年10月16日周六 下午11:47写道:
>
>> TO: everyone
>> I have create a dataStream demo as below,in the demo,create a very simple
>> example,
>> read stream data from sourceFunction,and send it to sinkFunction without
>> any processing.
>> The point is,by creating the instance of SourceFunction and SinkFunction
>> has used two separately URLClassLoader with different dependencies,for
>> avoiding the code conflict .
>> but the problem is flink client send to server ,the server side throw an
>> classNotFoundException which defined the de classloader dependencies,
>> Obviously the server side has not use the classloader as client side.
>> how can I solve the problem ,is there any one can give me some advice ?
>> thanks a lot
>>
>>
>>
>> public class FlinkStreamDemo {
>> public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> SourceFunction sourceFunc = createSourceFunction();
>>
>> DataStreamSource dtoDataStreamSource =
>> env.addSource(sourceFunc);
>>
>> SinkFunction sinkFunction = createSink();
>>
>> dtoDataStreamSource.addSink(sinkFunction);
>>
>> env.execute("flink-example");
>> }
>>
>> private static SinkFunction createSink() {
>> URL[] urls = new URL[]{...};
>> ClassLoader classLoader = new URLClassLoader(urls);
>> ServiceLoader loaders =
>> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>> Iterator it = loaders.iterator();
>> if (it.hasNext()) {
>> return it.next().create();
>> }
>> throw new IllegalStateException();
>> }
>>
>> private static SourceFunction createSourceFunction() {
>> URL[] urls = new URL[]{...};
>> ClassLoader classLoader = new URLClassLoader(urls);
>> ServiceLoader loaders =
>> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>> Iterator it = loaders.iterator();
>> if (it.hasNext()) {
>> return it.next().create();
>> }
>> throw new IllegalStateException();
>> }
>>
>> public interface ISinkFunctionFactory {
>> SinkFunction create();
>> }
>>
>> public interface ISourceFunctionFactory {
>> SourceFunction create();
>> }
>> }
>>
>>
>> from:
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues
>
>


[DISCUSS] Creating an external connector repository

2021-10-15 Thread Arvid Heise
Dear community,

Today I would like to kickstart a series of discussions around creating an
external connector repository. The main idea is to decouple the release
cycle of Flink with the release cycles of the connectors. This is a common
approach in other big data analytics projects and seems to scale better
than the current approach. In particular, it will yield the following
changes.


   -

   Faster releases of connectors: New features can be added more quickly,
   bugs can be fixed immediately, and we can have faster security patches in
   case of direct or indirect (through dependencies) security flaws.
   -

   New features can be added to old Flink versions: If the connector API
   didn’t change, the same connector jar may be used with different Flink
   versions. Thus, new features can also immediately be used with older Flink
   versions. A compatibility matrix on each connector page will help users to
   find suitable connector versions for their Flink versions.
   -

   More activity and contributions around connectors: If we ease the
   contribution and development process around connectors, we will see faster
   development and also more connectors. Since that heavily depends on the
   chosen approach discussed below, more details will be shown there.
   -

   An overhaul of the connector page: In the future, all known connectors
   will be shown on the same page in a similar layout independent of where
   they reside. They could be hosted on external project pages (e.g., Iceberg
   and Hudi), on some company page, or may stay within the main Flink reposi
   tory. Connectors may receive some sort of quality seal such that users
   can quickly access the production-readiness and we could also add which
   community/company promises which kind of support.
   -

   If we take out (some) connectors out of Flink, Flink CI will be faster
   and Flink devs will experience less build stabilities (which mostly come
   from connectors). That would also speed up Flink development.


Now I’d first like to collect your viewpoints on the ideal state. Let’s
first recap which approaches, we currently have:


   -

   We have half of the connectors in the main Flink repository. Relatively
   few of them have received updates in the past couple of months.
   -

   Another large chunk of connectors are in Apache Bahir. It recently has
   seen the first release in 3 years.
   -

   There are a few other (Apache) projects that maintain a Flink connector,
   such as Apache Iceberg, Apache Hudi, and Pravega.
   -

   A few connectors are listed on company-related repositories, such as
   Apache Pulsar on StreamNative and CDC connectors on Ververica.


My personal observation is that having a repository per connector seems to
increase the activity on a connector as it’s easier to maintain. For
example, in Apache Bahir all connectors are built against the same Flink
version, which may not be desirable when certain APIs change; for example,
SinkFunction will be eventually deprecated and removed but new Sink
interface may gain more features.

Now, I'd like to outline different approaches. All approaches will allow
you to host your connector on any kind of personal, project, or company
repository. We still want to provide a default place where users can
contribute their connectors and hopefully grow a community around it. The
approaches are:


   1.

   Create a mono-repo under the Apache umbrella where all connectors will
   reside, for example, github.com/apache/flink-connectors. That repository
   needs to follow its rules: No GitHub issues, no Dependabot or similar
   tools, and a strict manual release process. It would be under the Flink
   community, such that Flink committers can write to that repository but
   no-one else.
   2.

   Create a GitHub organization with small repositories, for example
   github.com/flink-connectors. Since it’s not under the Apache umbrella,
   we are free to use whatever process we deem best (up to a future
   discussion). Each repository can have a shared list of maintainers +
   connector specific committers. We can provide more automation. We may even
   allow different licenses to incorporate things like a connector to Oracle
   that cannot be released under ASL.
   3.

   ??? <- please provide your additional approaches


In both cases, we will provide opinionated module/repository templates
based on a connector testing framework and guidelines. Depending on the
approach, we may need to enforce certain things.

I’d like to first focus on what the community would ideally seek and
minimize the discussions around legal issues, which we would discuss later.
For now, I’d also like to postpone the discussion if we move all or only a
subset of connectors from Flink to the new default place as it seems to be
orthogonal to the fundamental discussion.

PS: If the external repository for connectors is successful, I’d also like
to move out other things like formats, filesystems, and 

Re: [NOTICE] CiBot improvements

2021-10-12 Thread Arvid Heise
Awesome!

On Tue, Oct 12, 2021 at 3:11 AM Guowei Ma  wrote:

> Thanks for your effort!
>
> Best,
> Guowei
>
>
> On Mon, Oct 11, 2021 at 9:26 PM Stephan Ewen  wrote:
>
> > Great initiative, thanks for doing this!
> >
> > On Mon, Oct 11, 2021 at 10:52 AM Till Rohrmann 
> > wrote:
> >
> > > Thanks a lot for this effort Chesnay! The improvements sound really
> good.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Oct 11, 2021 at 8:46 AM David Morávek  wrote:
> > >
> > > > Nice! Thanks for the effort Chesnay, this is really a huge step
> > forward!
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Mon, Oct 11, 2021 at 6:02 AM Xintong Song 
> > > > wrote:
> > > >
> > > > > Thanks for the effort, @Chesnay. This is super helpful.
> > > > >
> > > > > @Jing,
> > > > > Every push to the PR branch should automatically trigger an entire
> > new
> > > > > build. `@flinkbot run azure` should only be used when you want to
> > > re-run
> > > > > the failed stages without changing the PR. E.g., when running into
> > > known
> > > > > unstable cases that are unrelated to the PR.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Oct 11, 2021 at 11:45 AM JING ZHANG 
> > > > wrote:
> > > > >
> > > > > > Hi Chesnay,
> > > > > > Thanks for the effort. It is a very useful improvement.
> > > > > > I have a minor question. Please forgive me if the question is too
> > > > naive.
> > > > > > Since '@flinkbot run azure' now behaves like "Rerun failed jobs",
> > is
> > > > > there
> > > > > > any way to trigger an entirely new build? Because some times I'm
> > not
> > > > sure
> > > > > > the passed cases in the last build could still success in the new
> > > build
> > > > > > because of introduced updates in new commit.
> > > > > >
> > > > > > Best,
> > > > > > JING ZHANG
> > > > > >
> > > > > >
> > > > > > Yangze Guo  于2021年10月11日周一 上午10:31写道:
> > > > > >
> > > > > > > Thanks for that great job, Chesnay! "Rerun failed jobs" will
> > help a
> > > > > lot.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Sun, Oct 10, 2021 at 4:56 PM Chesnay Schepler <
> > > ches...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > I made a number of changes to the CiBot over the weekend.
> > > > > > > >
> > > > > > > > - '@flinkbot run azure' previously triggered an entirely new
> > > build
> > > > > > based
> > > > > > > > on the last completed one. It now instead retries the last
> > > > completed
> > > > > > > > build, only running the jobs that actually failed. It
> basically
> > > > > behaves
> > > > > > > > like the "Rerun failed jobs" button in the Azure UI.
> > > > > > > > - various optimizations to increase responsiveness (primarily
> > by
> > > > > doing
> > > > > > > > significantly less unnecessary work / requests to GH)
> > > > > > > > - removed TravisCI support (since we no longer support a
> > release
> > > > that
> > > > > > > > used Travis)
> > > > > > > >
> > > > > > > > Please ping me if you spot anything weird.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: The Apache Flink should pay more attention to ensuring API compatibility.

2021-10-04 Thread Arvid Heise
Hi Jark,

I also don't see it as a blocker issue at all. If you want to access the
metric group across 1.13 and 1.14, you can use

(MetricGroup) context.getClass().getMethod("metricGroup").invoke(context)

But of course, you will not be able to access the new standardized metrics.
For that you will need to maintain two different source/binary builds,
since it's a new feature.

I agree with Piotr, the issue is that we need a more standardized process
around PublicEvolving. Ideally, with every new minor release, we should
convert PublicEvolving to Public and Experimental to PublicEvolving. We
could extend the interfaces to capture a target version and a comment for
why the API is not Public yet. Before every release, we would go through
the annotated classes and either find a specific reason to keep the
annotation or move it towards Public. If we have a specific reason to keep
it Experimental/PublicEvolving, we should plan to address that reason with
the next release.

We do have good checks in place for Public; we are just too slow with
ensuring that new API becomes Public.

On Fri, Oct 1, 2021 at 9:41 PM Piotr Nowojski  wrote:

> Hi,
>
> I don't understand why we are talking about this being a blocker issue? New
> sources were not marked as @Public for a good reason until 1.14. I agree,
> we should try better at making APIs @Public sooner. I was even proposing to
> create strict timeout rules (enforced via some automated checks) like
> (unless for a very very good reason) everything marked @PublicEvolving
> or @Experimental should be upgraded to @Public after for example 2 years
> [1]. But for example the new Sink API IMO is too fresh to make it
> `@Public`.
>
> It doesn't change the fact that if we could provide a compatibility layer
> between 1.13.x and 1.14.x for this SourceReaderContext issue, it would be a
> nice thing to do. I would be -1 for keeping it forever, but trying to
> support forward compatibility of `@PublicEvolving` APIs for one or two
> releases into the future might be a good rule of thumb.
>
> Best, Piotrek
>
> [1] "[DISCUSS] Dealing with deprecated and legacy code in Flink" on the dev
> mailing list
>
>
> pt., 1 paź 2021 o 16:56 Jark Wu  napisał(a):
>
> > Hi Arvid,
> >
> > > Should we expect connector devs to release different connector binaries
> > for different Flink minors?
> > From the discussion of this thread, I think the answer is obviously
> "not",
> > otherwise OpenInx won't start
> >  this discussion. As a maintainer of flink-cdc-connector, I have to say
> > that it's very hard to release
> >  connectors for different flink versions. Usually, the connector
> community
> > doesn't have so much time to
> >  maintain different branches/modules/code for different flink versions.
> >
> > > If we change it back, then a specific connector would work for 1.14.1
> and
> > 1.13.X but not for 1.14.0 and this would be even more confusing.
> > I think this is fine. IMO, this is a blocker issue of 1.14.0 which breaks
> > Source connectors.
> > We should suggest users to use 1.14.1 if they use Source connectors.
> >
> > Best,
> > Jark
> >
> >
> > On Fri, 1 Oct 2021 at 19:05, Arvid Heise  wrote:
> >
> > > The issue is that if we do not mark them as Public, we will always have
> > > incompatibilities. The change of SourceReaderContext#metricGroup is
> > > perfectly fine according to the annotation. The implications that we
> see
> > > here just mean that the interfaces have been expected to be Public.
> > >
> > > And now the question is what do we expect?
> > > Should we expect connector devs to release different connector binaries
> > > for different Flink minors? Then PublicEvolving is fine.
> > > If we expect that the same connector can work across multiple Flink
> > > versions, we need to go into Public.
> > >
> > > It doesn't make sense to keep them PublicEvolving on the annotation but
> > > implicitly assume them to be Public.
> > >
> > > @Jark Wu  I don't see a way to revert the change of
> > > SourceReaderContext#metricGroup. For now, connector devs that expose
> > > metrics need to release 2 versions. If we change it back, then a
> specific
> > > connector would work for 1.14.1 and 1.13.X but not for 1.14.0 and this
> > > would be even more confusing.
> > >
> > > On Fri, Oct 1, 2021 at 10:49 AM Ingo Bürk  wrote:
> > >
> > >> Hi,
> > >>
> > >> > [...] but also the new Source/Sink APIs as public
> > >>
> > >> I'm not really involved in the new Source/Sink A

Re: The Apache Flink should pay more attention to ensuring API compatibility.

2021-10-01 Thread Arvid Heise
The issue is that if we do not mark them as Public, we will always have
incompatibilities. The change of SourceReaderContext#metricGroup is
perfectly fine according to the annotation. The implications that we see
here just mean that the interfaces have been expected to be Public.

And now the question is what do we expect?
Should we expect connector devs to release different connector binaries for
different Flink minors? Then PublicEvolving is fine.
If we expect that the same connector can work across multiple Flink
versions, we need to go into Public.

It doesn't make sense to keep them PublicEvolving on the annotation but
implicitly assume them to be Public.

@Jark Wu  I don't see a way to revert the change of
SourceReaderContext#metricGroup. For now, connector devs that expose
metrics need to release 2 versions. If we change it back, then a specific
connector would work for 1.14.1 and 1.13.X but not for 1.14.0 and this
would be even more confusing.

On Fri, Oct 1, 2021 at 10:49 AM Ingo Bürk  wrote:

> Hi,
>
> > [...] but also the new Source/Sink APIs as public
>
> I'm not really involved in the new Source/Sink APIs and will happily
> listen to the developers working with them here, but since they are new, we
> should also be careful not to mark them as stable too quickly. We've only
> begun updating the existing connectors to these interfaces at the moment.
> Making more progress here and keeping new APIs as Evolving for a couple of
> minor releases is probably still a good idea. Maybe we should even have
> actual rules on when APIs can/should be promoted?
>
> More actively checking backwards-compatibility during a release sounds
> like a great idea regardless, of course.
>
>
> Ingo
>
> On Fri, Oct 1, 2021 at 9:19 AM Jark Wu  wrote:
>
>> Hi all,
>>
>> Nice thread and great discussion! Ecosystem is one of the most important
>> things
>> to the Flink community, we should pay more attention to API compatibility.
>>
>> Marking all connector APIs @Public is a good idea, not only mark the
>> Table/SQL
>> connector APIs public, but also the new Source/Sink APIs as public.
>> Besides, we should also add a check item to the Verify Release
>> documentation[1]
>> to verify the release is backward-compatible for connectors. From my point
>> of view,
>> such backward incompatibility should cancel the vote.
>>
>> Regarding the SourceReaderContext#metricGroup compatibility problem in
>> 1.14.0, I would
>> suggest starting a new discussion thread to see whether we have any idea
>> to
>> fix it. We should
>> fix it ASAP! Otherwise iceberg/hudi/cdc communities will get frustrated
>> again when upgrading
>>  to 1.14.  Maybe we still have time to release a minor version, because
>> there is no
>> connector upgraded to 1.14.0 yet. What do you think? @Leonard Xu
>>  @Arvid Heise  @Piotr Nowojski
>> 
>>
>> Best,
>> Jark
>>
>> [1]:
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release
>>
>> On Wed, 29 Sept 2021 at 09:46, OpenInx  wrote:
>>
>> > > Apart from this being `@PublicEvolving`
>> >
>> > From my perspective,  annotating the 'DynamicTableSink' to be a
>> > 'PublicEvolving' class is not reasonable, because that means devs could
>> > just change the basic API which all downstream connectors are depending
>> on
>> > easily when iterating flink from 1.12 to 1.13 (according to the wiki
>> [1]).
>> > This implies all downstream maintainers must take on this maintenance
>> > burden, and it also makes our flink ecosystem very fragile.   Changing
>> the
>> > 'DynamicTableSink' between two major versions sounds reasonable to me,
>> but
>> > unreasonable for uncompatibility changes between two minor versions.   I
>> > think we may need to check those API which are annotated
>> 'PublicEnvoling'
>> > while should be 'Public' because of  the dependency from all connectors.
>> >  We should ensure the stability of those APIs that are necessary to
>> > implement the connector, and at the same time implement the updated v2
>> > version of the API. After all v2 APIs are considered stable, we will
>> mark
>> > them as stable. Instead of releasing a version of the API, some of the
>> APIs
>> > necessary to implement the connector are marked as stable and some are
>> > marked as unstable, which is very unfriendly to downstream. Because
>> > downstream essentially every upgrade requires refactoring of the code.
>> >
>> > > We are trying to provide forward compatibility: applications using
>> 

Re: The Apache Flink should pay more attention to ensuring API compatibility.

2021-09-28 Thread Arvid Heise
Thanks for starting the discussion. I think both issues are valid concerns
that we need to tackle. I guess the biggest issue is that now it's just not
possible to write 1 connector that runs for Flink 1.13 and 1.14, so we make
it much harder for devs in the ecosystem (and our goal is to make it
easier!).

I think the only solution is to actually treat (and mark) all these
interfaces as Public and only change them in Flink 2.0. If we need to
change them, we would need to actually add a secondary (possibly internal)
interface (at least that would have helped in SourceReaderContext).



On Tue, Sep 28, 2021 at 3:20 PM Chesnay Schepler  wrote:

> We already have such tooling via japicmp; it's just that it is only
> enabled for Public APIs.
>
> You can probably generate a report via japicmp for all
> PublicEvolging/Experimental APIs as well.
>
> On 28/09/2021 15:17, Ingo Bürk wrote:
> > Hi everyone,
> >
> > I think it would be best to support this process with tooling as much as
> > possible, because humans are bound to make mistakes. FLINK-24138[1]
> should
> > be a first step in this direction, but it wouldn't catch the cases
> > discussed here.
> > Maybe we should consider "capturing" the public API into some separate
> > file(s) and validating against that in the CI such that structural
> changes
> > to public APIs (moved classes, renamed / exchanged classes, changed
> > signatures, …) can be caught as an error. That would raise awareness for
> > any changes to public APIs and force a conscious decision.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-24138
> >
> >
> > Best
> > Ingo
> >
> > On Tue, Sep 28, 2021 at 2:49 PM Leonard Xu  wrote:
> >
>  Not sure if this will happen in 1.15 already. We will needed automated
>  compatibility tests and a well-defined list of stable API.
> >>>   We are
> >>> trying to provide forward compatibility: applications using `@Public`
> >> APIs
> >>> compiled against Flink 1.12.x, should work fine in Flink 1.13.x
> >> Unfortunately, I also meet forward compatibility issue, when I do the
> >> release 1.14 check, I try to use mysql-cdc connector[1] which compiled
> >> against 1.13.1in SQL Client, but it can not work in flink 1.14.0
> cluster,
> >> it failed due to the metric API compatibility broken.
> >>
> >> @Public
> >> public interface SourceReaderContext {
> >>
> >> MetricGroup metricGroup();
> >>
> >>
> >> @Public
> >> public interface SourceReaderContext {
> >>
> >>  SourceReaderMetricGroup metricGroup();
> >>
> >>
> >> Shouldn't we mark it as @Deprecated and then delete it util 2.0.0
> version
> >> for @Public API as the our community rule [2] described? At least we
> should
> >> keep them across server minor versions (..).
> >>
> >> Although these changes can be tracked to voted FLIPs and it’s not the
> >> fault of a few developers, it show us the fact that we didn’t pay enough
> >> attention to back compatibility/forward compatibility.
> >>
> >> Best,
> >> Leonard
> >> [1]
> >>
> https://github.com/ververica/flink-cdc-connectors/tree/master/flink-connector-mysql-cdc
> >> [2]
> >> https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
> >>
> >>
>
>


[jira] [Created] (FLINK-24252) HybridSource should return TypeInformation

2021-09-10 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24252:
---

 Summary: HybridSource should return TypeInformation
 Key: FLINK-24252
 URL: https://issues.apache.org/jira/browse/FLINK-24252
 Project: Flink
  Issue Type: Improvement
Reporter: Arvid Heise


Because {{HybridSource}} will never bind the actual type, it would be a good 
additional to implement {{ResultTypeQueryable}} to improve the usability. The 
type should be fetched or inferred from the wrapped sources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24250) Add De/Serialization API to tear-down user code

2021-09-10 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24250:
---

 Summary: Add De/Serialization API to tear-down user code
 Key: FLINK-24250
 URL: https://issues.apache.org/jira/browse/FLINK-24250
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Reporter: Arvid Heise


FLINK-17306 added {{open}} to {{(De)SerializationSchema}}. We should provide a 
symmetric {{closeX}} method. See [ML De/Serialization API to tear-down user 
code|https://lists.apache.org/thread.html/r31d36e076bd192bc87ff8c896557ac0e2812229ee0f2c67a1b9e6e12%40%3Cuser.flink.apache.org%3E]
 for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-09-08 Thread Arvid Heise
e connectors are
> different
> > in
> > > > > > implementation and it is hard to make this feature. When the
> > consumed
> > > > > data
> > > > > > is not aligned or consuming history data, it is very easy to
> cause
> > > the
> > > > > > unalignment. Source alignment can resolve many unstable problems.
> > > > > >
> > > > > > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > In my opinion, this limitation is perfectly fine for the MVP.
> > > > Watermark
> > > > > > > alignment is a long-standing issue and this already moves the
> > ball
> > > so
> > > > > far
> > > > > > > forward.
> > > > > > >
> > > > > > > I don't expect this will cause many issues in practice, as I
> > > > understand
> > > > > > it
> > > > > > > the FileSource always processes one split at a time, and in my
> > > > > > experience,
> > > > > > > 90% of Kafka users have a small number of partitions scale
> their
> > > > > > pipelines
> > > > > > > to have one reader per partition. Obviously, there are
> > larger-scale
> > > > > Kafka
> > > > > > > topics and more sources that will be ported over in the future
> > but
> > > I
> > > > > > think
> > > > > > > there is an implicit understanding that aligning sources adds
> > > latency
> > > > > to
> > > > > > > pipelines, and we can frame the follow-up "per-split" alignment
> > as
> > > an
> > > > > > > optimization.
> > > > > > >
> > > > > > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski <
> > > > > piotr.nowoj...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey!
> > > > > > > >
> > > > > > > > A couple of weeks ago me and Arvid Heise played around with
> an
> > > idea
> > > > > to
> > > > > > > > address a long standing issue of Flink: lack of
> watermark/event
> > > > time
> > > > > > > > alignment between different parallel instances of sources,
> that
> > > can
> > > > > > lead
> > > > > > > to
> > > > > > > > ever growing state size for downstream operators like
> > > > WindowOperator.
> > > > > > > >
> > > > > > > > We had an impression that this is relatively low hanging
> fruit
> > > that
> > > > > can
> > > > > > > be
> > > > > > > > quite easily implemented - at least partially (the first part
> > > > > mentioned
> > > > > > > in
> > > > > > > > the FLIP document). I have written down our proposal [1] and
> > you
> > > > can
> > > > > > also
> > > > > > > > check out our PoC that we have implemented [2].
> > > > > > > >
> > > > > > > > We think that this is a quite easy proposal, that has been in
> > > large
> > > > > > part
> > > > > > > > already implemented. There is one obvious limitation of our
> > PoC.
> > > > > Namely
> > > > > > > we
> > > > > > > > can only easily block individual SourceOperators. This works
> > > > > perfectly
> > > > > > > fine
> > > > > > > > as long as there is at most one split per SourceOperator.
> > However
> > > > it
> > > > > > > > doesn't work with multiple splits. In that case, if a single
> > > > > > > > `SourceOperator` is responsible for processing both the least
> > and
> > > > the
> > > > > > > most
> > > > > > > > advanced splits, we won't be able to block this most advanced
> > > split
> > > > > for
> > > > > > > > generating new records. I'm proposing to solve this problem
> in
> > > the
> > > > > > future
> > > > > > > > in another follow up FLIP, as a solution that works with a
> > single
> > > > > split
> > > > > > > per
> > > > > > > > operator is easier and already valuable for some of the
> users.
> > > > > > > >
> > > > > > > > What do you think about this proposal?
> > > > > > > > Best, Piotrek
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > > > > > > [2]
> https://github.com/pnowojski/flink/commits/aligned-sources
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-24182) Tasks canceler should not immediately interrupt

2021-09-07 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24182:
---

 Summary: Tasks canceler should not immediately interrupt
 Key: FLINK-24182
 URL: https://issues.apache.org/jira/browse/FLINK-24182
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Arvid Heise


While debugging resource leaks (FLINK-24131), I found that any connector is 
immediately interrupted on cancel. Hence, any attempts of using blocking calls 
in {{close}} to cleanup resources are immediately unreliable (e.g. aborting 
transactions).

It would be nice if tasks get a grace period (e.g. task.cancellation.interval) 
where they can try to free resources in a proper, potentially blocking fashion 
before being interrupted.

Nevertheless, connectors should always expect interruptions during shutdown, in 
particular when the user-configurable grace period is depleted. I'd add that to 
the connector documentation in a separate effort.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink Stateful Functions 3.1.0 released

2021-09-06 Thread Arvid Heise
Congratulations! New features look awesome.

On Wed, Sep 1, 2021 at 9:10 AM Till Rohrmann  wrote:

> Great news! Thanks a lot for all your work on the new release :-)
>
> Cheers,
> Till
>
> On Wed, Sep 1, 2021 at 9:07 AM Johannes Moser  wrote:
>
>> Congratulations, great job. 
>>
>> On 31.08.2021, at 17:09, Igal Shilman  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink Stateful Functions (StateFun) 3.1.0.
>>
>> StateFun is a cross-platform stack for building Stateful Serverless
>> applications, making it radically simpler to develop scalable, consistent,
>> and elastic distributed applications.
>>
>> Please check out the release blog post for an overview of the release:
>> https://flink.apache.org/news/2021/08/31/release-statefun-3.1.0.html
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Maven artifacts for StateFun can be found at:
>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>
>> Python SDK for StateFun published to the PyPI index can be found at:
>> https://pypi.org/project/apache-flink-statefun/
>>
>> Official Docker images for StateFun are published to Docker Hub:
>> https://hub.docker.com/r/apache/flink-statefun
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350038=12315522
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Thanks,
>> Igal
>>
>>
>>


[jira] [Created] (FLINK-24108) Translate KafkaSink documentation to Chinese

2021-09-01 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-24108:
---

 Summary: Translate KafkaSink documentation to Chinese
 Key: FLINK-24108
 URL: https://issues.apache.org/jira/browse/FLINK-24108
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation
Affects Versions: 1.14.0
Reporter: Arvid Heise






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Apache Flink Stateful Functions 3.1.0, release candidate #1

2021-08-26 Thread Arvid Heise
+1 (binding)

- Built from downloaded sources with Java 8 (mvn install -Prun-e2e-tests)
- Verified signatures and hashes

Best,

Arvid

On Thu, Aug 26, 2021 at 8:32 AM Tzu-Li (Gordon) Tai 
wrote:

> +1 (binding)
>
> - Built from source with Java 11 and Java 8 (mvn clean install
> -Prun-e2e-tests)
> - verified signatures and hashes
> - verified NOTICE files of Maven artifacts properly list actual bundled
> dependencies
> - Ran GoLang greeter and showcase with the proposed Dockerfiles for 3.1.0
> - Ran a local smoke E2E against the Java SDK, with adjusted parameters to
> run for a longer period of time
>
> Thanks for driving the release Igal!
>
> Cheers,
> Gordon
>
> On Thu, Aug 26, 2021 at 4:06 AM Seth Wiesman  wrote:
>
> > +1 (non-binding)
> >
> > - verified signatures and hashes
> > - Checked licenses
> > - ran mvn clean install -Prun-e2e-tests
> > - ran golang greeter and showcase from the playground [1]
> >
> > Seth
> >
> > [1] https://github.com/apache/flink-statefun-playground/pull/12
> >
> > On Wed, Aug 25, 2021 at 9:44 AM Igal Shilman  wrote:
> >
> > > +1 from my side:
> > >
> > > Here are the results of my RC2 testing:
> > >
> > > - verified the signatures and hashes
> > > - verified that the source distribution doesn't contain any binary
> files
> > > - ran mvn clean install -Prun-e2e-tests
> > > - ran Java and Python greeters from the playground [1] with the new
> > module
> > > structure, and async transport enabled.
> > > - verified that the docker image [2] builds and inspected the contents
> > > manually.
> > >
> > > Thanks,
> > > Igal
> > >
> > > [1] https://github.com/apache/flink-statefun-playground/tree/dev
> > > [2] https://github.com/apache/flink-statefun-docker/pull/15
> > >
> > >
> > > On Tue, Aug 24, 2021 at 3:34 PM Igal Shilman  wrote:
> > >
> > > > Sorry, the subject of the previous message should have said "[VOTE]
> > > Apache
> > > > Flink Stateful Functions 3.1.0, release candidate #2".
> > > >
> > > >
> > > > On Tue, Aug 24, 2021 at 3:24 PM Igal Shilman 
> wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> Please review and vote on the release candidate #2 for the version
> > 3.1.0
> > > >> of Apache Flink Stateful Functions, as follows:
> > > >> [ ] +1, Approve the release
> > > >> [ ] -1, Do not approve the release (please provide specific
> comments)
> > > >>
> > > >> **Testing Guideline**
> > > >>
> > > >> You can find here [1] a page in the project wiki on instructions for
> > > >> testing.
> > > >> To cast a vote, it is not necessary to perform all listed checks,
> > > >> but please mention which checks you have performed when voting.
> > > >>
> > > >> **Release Overview**
> > > >>
> > > >> As an overview, the release consists of the following:
> > > >> a) Stateful Functions canonical source distribution, to be deployed
> to
> > > >> the release repository at dist.apache.org
> > > >> b) Stateful Functions Python SDK distributions to be deployed to
> PyPI
> > > >> c) Maven artifacts to be deployed to the Maven Central Repository
> > > >> d) New Dockerfiles for the release
> > > >> e) GoLang SDK tag statefun-sdk-go/v3.1.0-rc2
> > > >>
> > > >> **Staging Areas to Review**
> > > >>
> > > >> The staging areas containing the above mentioned artifacts are as
> > > >> follows, for your review:
> > > >> * All artifacts for a) and b) can be found in the corresponding dev
> > > >> repository at dist.apache.org [2]
> > > >> * All artifacts for c) can be found at the Apache Nexus Repository
> [3]
> > > >>
> > > >> All artifacts are signed with the key
> > > >> 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4]
> > > >>
> > > >> Other links for your review:
> > > >> * JIRA release notes [5]
> > > >> * source code tag "release-3.1.0-rc2" [6]
> > > >> * PR for the new Dockerfiles [7]
> > > >>
> > > >> **Vote Duration**
> > > >>
> > > >> The voting time will run for at least 72 hours (since RC1). We are
> > > >> targeting this vote to last until Thursday. 26th of August, 6pm CET.
> > > >> If it is adopted by majority approval, with at least 3 PMC
> affirmative
> > > >> votes, it will be released.
> > > >>
> > > >> Thanks,
> > > >> Igal
> > > >>
> > > >> [1]
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
> > > >> [2]
> > > >>
> > https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.0-rc2/
> > > >> [3]
> > > >>
> > https://repository.apache.org/content/repositories/orgapacheflink-1446/
> > > >> [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > >> [5]
> > > >>
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350038=12315522
> > > >> [6] https://github.com/apache/flink-statefun/tree/release-3.1.0-rc2
> > > >> [7] https://github.com/apache/flink-statefun-docker/pull/15
> > > >>
> > > >>
> > >
> >
>


[jira] [Created] (FLINK-23969) Test Pulsar source end 2 end

2021-08-25 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23969:
---

 Summary: Test Pulsar source end 2 end
 Key: FLINK-23969
 URL: https://issues.apache.org/jira/browse/FLINK-23969
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Arvid Heise
 Fix For: 1.14.0


Write a test application using Pulsar Source and execute it in distributed 
fashion. Check fault-tolerance by crashing and restarting a TM.

Ideally, we test different subscription modes and sticky keys in particular.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Merge Kafka-related PRs after feature freeze

2021-08-18 Thread Arvid Heise
Dear devs,

we would like to merge these PRs after features freeze:
FLINK-23838: Add FLIP-33 metrics to new KafkaSink [1]
FLINK-23801: Add FLIP-33 metrics to KafkaSource [2]
FLINK-23640: Create a KafkaRecordSerializationSchemas builder [3]

All of the 3 PRs are smaller quality of life improvements that are purely
implemented in flink-connector-kafka, so the risk in merging them is
minimal in terms of production stability. They also reuse existing test
infrastructure, so I expect little impact on the test stability.

We are still polishing the PRs and would be ready to merge them on Friday
when the objection period would be over.

Happy to hear your thoughts,

Arvid

[1] https://github.com/apache/flink/pull/16875
[2] https://github.com/apache/flink/pull/16838
[3] https://github.com/apache/flink/pull/16783


[jira] [Created] (FLINK-23858) Clarify StreamRecord#timestamp.

2021-08-18 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23858:
---

 Summary: Clarify StreamRecord#timestamp.
 Key: FLINK-23858
 URL: https://issues.apache.org/jira/browse/FLINK-23858
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Network
Reporter: Arvid Heise


The new Source apparently changed the way we specify records without 
timestamps. Previously, we used separate methods to create and maintain 
timestamp-less records.
Now, we are shiftings towards using a special value 
(TimeStampAssigner#NO_TIMESTAMP).

We first of all need to document that somewhere; at the very least in the 
JavaDoc of StreamRecord.
We should also revise the consequences: 
- Do we want to encode it in the {{StreamElementSerializer}}? Currently, we use 
a flag to indicate no-timestamp on the old path but in the new path we now use 
9 bytes to encode NO_TIMESTAMP.
- We should check if all code-paths deal with `hasTimestamp() == true && 
getTimestamp() == TimeStampAssigner#NO_TIMESTAMP`, in particular with sinks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23817) Write documentation for standardized operator metrics

2021-08-16 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23817:
---

 Summary: Write documentation for standardized operator metrics
 Key: FLINK-23817
 URL: https://issues.apache.org/jira/browse/FLINK-23817
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Arvid Heise
 Fix For: 1.14.0


Incorporate metrics in connector page. Use 
[data-templates|https://gohugo.io/templates/data-templates/] for common metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Backport HybridSource to 1.13.x

2021-08-16 Thread Arvid Heise
Hi Thomas,

that's neat. I forgot for a moment that connector-base is not part of
flink-dist.

I guess in theory, we could also omit the backport and simply refer users
to 1.14 version. I'm assuming you want to have it in 1.13 since 1.14 still
takes a bit. Am I correct?

On Mon, Aug 16, 2021 at 7:43 PM Thomas Weise  wrote:

> Hi Arvid,
>
> Thank you for the reply. Can you please explain a bit more
> your concern regarding an earlier bugfix level?
>
> I should have maybe made clear that the HybridSource can be used by
> updating flink-connector-base in the application jar. It does not require
> any addition to the runtime and therefore would work on any 1.13.x dist.
>
> For reference, I use it internally on top of 1.12.4.
>
> Thanks,
> Thomas
>
>
> On Mon, Aug 16, 2021 at 10:13 AM Arvid Heise  wrote:
>
> > Hi Thomas,
> >
> > since the change didn't modify any existing classes, I'm weakly in favor
> of
> > backporting. My reluctance mainly stems from possible disappointments
> from
> > 1.13 users that use an earlier bugfix level. So we need to make
> > documentation clear.
> >
> > In general, I'm seeing connectors as something extra (and plan to make
> that
> > more transparent), so I think we have more freedom for backports there in
> > contrast to other components. But it would be good to hear other opinions
> > on that matter.
> >
> > On Mon, Aug 16, 2021 at 5:26 PM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > HybridSource [1] [2] was recently merged to the main branch. I would
> like
> > > to propose to backport it to release-1.13. Although it is a new
> feature,
> > it
> > > is also strictly additive and does not affect any existing code. The
> > > benefit is that we will have a released runtime version that the
> feature
> > > can be used with for those that are interested in it and dependent on a
> > > distribution they cannot modify.
> > >
> > > Are there any concerns backporting the change?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-22670
> > > [2] https://github.com/apache/flink/pull/15924
> > >
> >
>


[jira] [Created] (FLINK-23816) Test new (Kafka) metrics in cluster

2021-08-16 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23816:
---

 Summary: Test new (Kafka) metrics in cluster
 Key: FLINK-23816
 URL: https://issues.apache.org/jira/browse/FLINK-23816
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Arvid Heise
 Fix For: 1.14.0


* Setup a Kafka cluster (dockerized local setup is fine)
* Start simple Kafka consumer/producer job
* Add a bunch of example data (a few minutes worth of processing)
* Look at metrics, compare with 
[FLIP-33|FLIP-33%3A+Standardize+Connector+Metrics] definitions.
* Pay special attention to lag metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Backport HybridSource to 1.13.x

2021-08-16 Thread Arvid Heise
Hi Thomas,

since the change didn't modify any existing classes, I'm weakly in favor of
backporting. My reluctance mainly stems from possible disappointments from
1.13 users that use an earlier bugfix level. So we need to make
documentation clear.

In general, I'm seeing connectors as something extra (and plan to make that
more transparent), so I think we have more freedom for backports there in
contrast to other components. But it would be good to hear other opinions
on that matter.

On Mon, Aug 16, 2021 at 5:26 PM Thomas Weise  wrote:

> Hi,
>
> HybridSource [1] [2] was recently merged to the main branch. I would like
> to propose to backport it to release-1.13. Although it is a new feature, it
> is also strictly additive and does not affect any existing code. The
> benefit is that we will have a released runtime version that the feature
> can be used with for those that are interested in it and dependent on a
> distribution they cannot modify.
>
> Are there any concerns backporting the change?
>
> Thanks,
> Thomas
>
> [1] https://issues.apache.org/jira/browse/FLINK-22670
> [2] https://github.com/apache/flink/pull/15924
>


[jira] [Created] (FLINK-23807) Use metrics to detect restarts in MiniClusterTestEnvironment#triggerTaskManagerFailover

2021-08-16 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23807:
---

 Summary: Use metrics to detect restarts in 
MiniClusterTestEnvironment#triggerTaskManagerFailover
 Key: FLINK-23807
 URL: https://issues.apache.org/jira/browse/FLINK-23807
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Reporter: Arvid Heise
 Fix For: 1.14.0


{{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job status 
to detect a restart 
{noformat}
terminateTaskManager();
CommonTestUtils.waitForJobStatus(
jobClient,
Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
Deadline.fromNow(Duration.ofMinutes(5)));
afterFailAction.run();
startTaskManager();
{noformat}
However, `waitForJobStatus` polls every 100ms while the restart can happen 
within 10ms and thus can easily miss the actual restart and wait forever (or 
when the next restart happens because slots are missing).

We should rather use the metric `numRestarts`, check before the induced error, 
and wait until the counter increased.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23803) Improve instantiation of InMemoryReporter

2021-08-16 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23803:
---

 Summary: Improve instantiation of InMemoryReporter
 Key: FLINK-23803
 URL: https://issues.apache.org/jira/browse/FLINK-23803
 Project: Flink
  Issue Type: Technical Debt
  Components: Test Infrastructure
Affects Versions: 1.14.0
Reporter: Arvid Heise


Currently, InMemoryReporter assumes it's used with the MiniCluster from the 
main thread. (It internally uses thread locals)

A better approach would be to create a unique id in the MiniCluster and use the 
factory to fetch the appropriate instance from a global map. In this way, 
different threading model and even concurrent MiniCluster instances would be 
supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23767) FLIP-180: Adjust StreamStatus and Idleness definition

2021-08-13 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23767:
---

 Summary: FLIP-180: Adjust StreamStatus and Idleness definition
 Key: FLINK-23767
 URL: https://issues.apache.org/jira/browse/FLINK-23767
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Arvid Heise


The implementation ticket for 
[FLIP-180|https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-180: Adjust StreamStatus and Idleness definition

2021-08-13 Thread Arvid Heise
Dear devs,

I'm happy to announce that we have unanimously approved this FLIP.

There are 4 approving votes of which 4 are binding:
Till Rohrmann (binding)
Stephan Ewen (binding)
Piotr Nowojski (binding)
Tzu-Li (Gordon) Tai (binding)

Best,

Arvid

On Mon, Aug 9, 2021 at 6:46 PM Tzu-Li (Gordon) Tai 
wrote:

> +1 (binding)
>
> On Tue, Aug 10, 2021 at 12:15 AM Piotr Nowojski 
> wrote:
>
> > +1 (binding)
> >
> > Piotrek
> >
> > pon., 9 sie 2021 o 18:00 Stephan Ewen  napisał(a):
> >
> > > +1 (binding)
> > >
> > > On Mon, Aug 9, 2021 at 12:08 PM Till Rohrmann 
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Aug 5, 2021 at 9:09 PM Arvid Heise  wrote:
> > > >
> > > > > Dear devs,
> > > > >
> > > > > I'd like to open a vote on FLIP-180: Adjust StreamStatus and
> Idleness
> > > > > definition [1] which was discussed in this thread [2].
> > > > > The vote will be open for at least 72 hours unless there is an
> > > objection
> > > > or
> > > > > not enough votes.
> > > > >
> > > > > Best,
> > > > >
> > > > > Arvid
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-23759) notifyCheckpointComplete without corresponding snapshotState

2021-08-13 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23759:
---

 Summary: notifyCheckpointComplete without corresponding 
snapshotState
 Key: FLINK-23759
 URL: https://issues.apache.org/jira/browse/FLINK-23759
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Arvid Heise


In a private run on AZP, I found a run where {{notifyCheckpointComplete}} was 
invoked without prior {{snapshotState}} after the default 
ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH was changed to true.
https://dev.azure.com/arvidheise0209/arvidheise/_build/results?buildId=1325=logs=43a593e7-535d-554b-08cc-244368da36b4=82d122c0-8bbf-56f3-4c0d-8e3d69630d0f

This causes the following NPE because the implementation relies on 
{{notifyCheckpointComplete}} being called after a corresponding 
{{snapshotState}} (valid assumption).
{noformat}
Aug 12 19:25:20 [ERROR] Tests run: 104, Failures: 0, Errors: 1, Skipped: 0, 
Time elapsed: 87.848 s <<< FAILURE! - in 
org.apache.flink.table.planner.runtime.stream.sql.JoinITCase
Aug 12 19:25:20 [ERROR] testBigDataOfJoin[StateBackend=HEAP]  Time elapsed: 
0.792 s  <<< ERROR!
Aug 12 19:25:20 org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
Aug 12 19:25:20 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
Aug 12 19:25:20 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
Aug 12 19:25:20 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
Aug 12 19:25:20 at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
Aug 12 19:25:20 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
Aug 12 19:25:20 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
Aug 12 19:25:20 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
Aug 12 19:25:20 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
Aug 12 19:25:20 at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
Aug 12 19:25:20 at akka.dispatch.OnComplete.internal(Future.scala:300)
Aug 12 19:25:20 at akka.dispatch.OnComplete.internal(Future.scala:297)
Aug 12 19:25:20 at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
Aug 12 19:25:20 at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
Aug 12 19:25:20 at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
Aug 12 19:25:20 at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
Aug 12 19:25:20 at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
Aug 12 19:25:20 at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
Aug 12 19:25:20 at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
Aug 12 19:25:20 at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
Aug 12 19:25:20 at 
akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
Aug 12 19:25:20 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
Aug 12 19:25:20 at 
akka.pattern.PipeToSupport$Pipea

[jira] [Created] (FLINK-23722) S3 Tests fail on AZP: Unable to find a region via the region provider chain. Must provide an explicit region in the builder or setup environment to supply a region.

2021-08-11 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23722:
---

 Summary: S3 Tests fail on AZP: Unable to find a region via the 
region provider chain. Must provide an explicit region in the builder or setup 
environment to supply a region.
 Key: FLINK-23722
 URL: https://issues.apache.org/jira/browse/FLINK-23722
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.14.0
Reporter: Arvid Heise
Assignee: Arvid Heise


E2E and integration tests fail with

{noformat}
Aug 11 09:11:32 Caused by: com.amazonaws.SdkClientException: Unable to find a 
region via the region provider chain. Must provide an explicit region in the 
builder or setup environment to supply a region.
Aug 11 09:11:32 at 
com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:462)
Aug 11 09:11:32 at 
com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
Aug 11 09:11:32 at 
com.amazonaws.client.builder.AwsSyncClientBuilder.build(AwsSyncClientBuilder.java:46)
Aug 11 09:11:32 at 
org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.buildAmazonS3Client(DefaultS3ClientFactory.java:144)
Aug 11 09:11:32 at 
org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:96)
Aug 11 09:11:32 at 
org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:753)
Aug 11 09:11:32 at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:446)
Aug 11 09:11:32 ... 44 more
{noformat}


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21884=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.12.5 released

2021-08-10 Thread Arvid Heise
Thank you.

On Tue, Aug 10, 2021 at 11:44 AM Till Rohrmann  wrote:

> This is great news. Thanks a lot for being our release manager Jingsong
> and also to everyone who made this release possible.
>
> Cheers,
> Till
>
> On Tue, Aug 10, 2021 at 10:57 AM Jingsong Lee 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.12.5, which is the fifth bugfix release for the Apache Flink 1.12
>> series.
>>
>>
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming
>> applications.
>>
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>>
>>
>>
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>>
>> https://flink.apache.org/news/2021/08/06/release-1.12.5.html
>>
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350166
>>
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>>
>> Regards,
>>
>> Jingsong Lee
>>
>


Re: [ANNOUNCE] Apache Flink 1.11.4 released

2021-08-10 Thread Arvid Heise
Awesome! Thank you for driving this.

On Tue, Aug 10, 2021 at 11:45 AM Till Rohrmann  wrote:

> This is great news. Thanks a lot for being our release manager Godfrey and
> also to everyone who made this release possible.
>
> Cheers,
> Till
>
> On Tue, Aug 10, 2021 at 11:09 AM godfrey he  wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.11.4, which is the fourth bugfix release for the Apache
>> Flink 1.11 series.
>>
>>
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>>
>>
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>>
>> https://flink.apache.org/news/2021/08/09/release-1.11.4.html
>>
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349404
>>
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>>
>> Regards,
>>
>> Godfrey
>>
>


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-10 Thread Arvid Heise
Thank you!

On Tue, Aug 10, 2021 at 11:04 AM Jingsong Li  wrote:

> Thanks Yun Tang and everyone!
>
> Best,
> Jingsong
>
> On Tue, Aug 10, 2021 at 9:37 AM Xintong Song 
> wrote:
>
>> Thanks Yun and everyone~!
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann 
>> wrote:
>>
>> > Thanks Yun Tang for being our release manager and the great work! Also
>> > thanks a lot to everyone who contributed to this release.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
>> >
>> >> Thanks Yun Tang for being our release manager and everyone else who
>> made
>> >> the release possible!
>> >>
>> >> Best Regards,
>> >> Yu
>> >>
>> >>
>> >> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>> >>
>> >>>
>> >>> The Apache Flink community is very happy to announce the release of
>> >>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>> >>> Flink 1.13 series.
>> >>>
>> >>> Apache Flink® is an open-source stream processing framework for
>> >>> distributed, high-performing, always-available, and accurate data
>> streaming
>> >>> applications.
>> >>>
>> >>> The release is available for download at:
>> >>> https://flink.apache.org/downloads.html
>> >>>
>> >>> Please check out the release blog post for an overview of the
>> >>> improvements for this bugfix release:
>> >>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>> >>>
>> >>> The full release notes are available in Jira:
>> >>>
>> >>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
>> >>>
>> >>> We would like to thank all contributors of the Apache Flink community
>> >>> who made this release possible!
>> >>>
>> >>> Regards,
>> >>> Yun Tang
>> >>>
>> >>
>>
>
>
> --
> Best, Jingsong Lee
>


[jira] [Created] (FLINK-23652) Implement FLIP-179

2021-08-05 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23652:
---

 Summary: Implement FLIP-179
 Key: FLINK-23652
 URL: https://issues.apache.org/jira/browse/FLINK-23652
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.14.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 1.14.0


This ticket is about implementing 
[FLIP-179|https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics].
 It will some metrics out-of-the-box for sources/sinks and supports connector 
developers to easily implement some standardized metrics.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

2021-08-05 Thread Arvid Heise
Hi Dawid,

thanks for clarification. Having looked into the related classes again, I
agree that exposing the current StreamStatus would severly limit future
developments. I guess to expose it in the Sink, we would need to have a new
WatermarkStatus in eventtime package (as per Eron's suggestion) that is
independent of the internal status. It's very similar to how we have 2
Watermarks (and I now finally understand why there are 2 of them). I'd like
to leave that as future work.

With that being said, I started the vote thread about the original proposal
[1].

The explicit listing of the name changes are quite verbose, so I left it
out in the FLIP. I'd like to point to the POC PR [2] for details.
Ultimately, I can also attach an appendix or even an attachment if someone
needs it to cast the vote.

[1]
https://lists.apache.org/thread.html/rcfcb9126e31d6641e1cc96834310c5b6fafff0c948973f97d1ac70f2%40%3Cdev.flink.apache.org%3E
[2] https://github.com/apache/flink/pull/16433

On Thu, Aug 5, 2021 at 3:06 PM Dawid Wysakowicz 
wrote:

> Hey all,
>
> Just a couple of comments from my side as I was called here.
>
> +1 for making stream status just about watermarks.
>
> I observe that AbstractStreamOperator is hardcoded to derive the output
> channel's status from the input channel's status.  May I suggest
> we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
> The reason for making that method final from my side is very similar to
> the reason of this discussion. There is/was no clear definition of what
> StreamStatus is and what it is not. I did not want to let users arbitrarily
> play with it until we have a clear semantic. Even if I still find the
> Operator API only semi public.
>
> As for the quesion if we should or should not expose it in sinks. If we
> say it is a purely user defined logic that affects only watermarks, even if
> hardly I can imagine it can be persisted. Still personally I don't think it
> is a good approach. As I mentioned a few times before, I see stream status
> as a tradeoff between correctness and making progress in real/processing
> time.
>
> However if we decide to expose that in sinks it must not be the
> StreamStatus we have now. It must be a completely new class. The current
> StreamStatus extends from StreamElement which is a really low level concept
> which I am strongly against exposing in any kind of public API.
>
> Best,
>
> Dawid
> On 05/08/2021 12:21, Till Rohrmann wrote:
>
> Coming back to my previous comment: I would actually propose to separate
> the discussion about whether to expose the WatermarkStatus in the sinks or
> not from correcting the StreamStatus and Idleness definition in order to
> keep the scope of this FLIP as small as possible. If there is a good reason
> to expose the WatermarkStatus, then we can probably do it.
>
> Cheers,
> Till
>
> On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise  
>  wrote:
>
>
> Hi Martijn,
>
> 1. Good question. The watermarks and statuses of the splits are first
> aggregated before emitted through the reader. The watermark strategy of the
> user is actually applied on all SourceOutputs (=splits). Since one split is
> active and one is idle, the watermark of the reader will not advance until
> the user-defined idleness is triggered on the idle split. At this point,
> the combined watermark solely depends on the active split. The combined
> status remains ACTIVE.
> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
> side. In fact, if you search for Kafka and partition discovery, you will
> only find Flink resources. What we actually do is dynamic topic discovery
> and that can only be triggered through pattern afaik. We could go for topic
> discovery on all patterns by default if we don't do that already.
> 3. Yes, idleness on assigned partitions would even work with dynamic
> assignments. I will update the FLIP to reflect that.
> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
> should be this way after the FLIP. I don't know of any source
> implementation that uses the user-specified idleness to handle scenario 3.
> The thing that is currently extra is that some readers go idle, when the
> reader doesn't have an active assignment.
>
> Best,
>
> Arvid
>
> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser  
> 
> wrote:
>
>
> Hi all,
>
> I have a couple of questions after studying the FLIP and the docs:
>
> 1. What happens when one of the readers has two splits assigned and one
>
> of
>
> the splits actually receives data?
>
> 2. If I understand it correctly the Kinesis Source uses dynam

[VOTE] FLIP-180: Adjust StreamStatus and Idleness definition

2021-08-05 Thread Arvid Heise
Dear devs,

I'd like to open a vote on FLIP-180: Adjust StreamStatus and Idleness
definition [1] which was discussed in this thread [2].
The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
[2]
https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E


[jira] [Created] (FLINK-23621) FLIP-177: Extend Sink API

2021-08-04 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23621:
---

 Summary: FLIP-177: Extend Sink API
 Key: FLINK-23621
 URL: https://issues.apache.org/jira/browse/FLINK-23621
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.14.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 1.14.0


Implementation ticket for 
[FLIP-177|https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT] [VOTE] FLIP-177: Extend Sink API

2021-08-04 Thread Arvid Heise
Dear devs,

I'm happy to announce that we have unanimously approved this FLIP.

There are 3 approving votes of which 3 are binding:
Thomas Weise (binding)
Danny Cranmer (binding)
Guowei Ma (binding)

Best,

Arvid

On Mon, Aug 2, 2021 at 11:10 AM Guowei Ma  wrote:

> +1(binding)
> Best,
> Guowei
>
>
> On Mon, Aug 2, 2021 at 4:04 PM Danny Cranmer 
> wrote:
>
> > +1 (binding)
> >
> > On Mon, Aug 2, 2021 at 12:42 AM Thomas Weise  wrote:
> >
> > > +1 (binding)
> > >
> > >
> > > On Fri, Jul 30, 2021 at 5:05 AM Arvid Heise  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a vote on FLIP-177: Extend Sink API [1] which
> > provides
> > > > small extensions to the Sink API introduced through FLIP-143.
> > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > or
> > > > not enough votes.
> > > >
> > > > Note that the FLIP was larger initially and I cut down all
> > > > advanced/breaking changes.
> > > >
> > > > Best,
> > > >
> > > > Arvid
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > > >
> > >
> >
>


[RESULT] [VOTE] FLIP-179: Expose Standardized Operator Metrics

2021-08-04 Thread Arvid Heise
Dear devs,

I'm happy to announce that we have unanimously approved this FLIP.

There are 4 approving votes of which 3 are binding:
Steven Wu (non-binding)
Jiangjie (Becket) Qin (binding)
Chesnay Schepler (binding)
Thomas Weise (binding)

Best,

Arvid

On Wed, Aug 4, 2021 at 7:06 AM Becket Qin  wrote:

> Personally speaking, it is intuitive for me to set a gauge in MetricGroup.
> So I am fine with set*Gauge pattern as long as the method is in
> *MetricGroup class.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Aug 3, 2021 at 7:24 PM Arvid Heise  wrote:
>
> > @Becket Qin  @Thomas Weise  would
> > you
> > also agree to @Chesnay Schepler  's proposal?
> >
> > I think the main intention is to only use a Gauge when the exact metric
> is
> > exposed. For any partial value that may be used in an internal/predefined
> > metric, we would only use a supplier instead of a Gauge.
> >
> > So a connector developer can immediately distinguish the cases: if it's a
> > metric class he would see the exact metric corresponding to the setter.
> If
> > it's some Supplier, the developer would expect that the value is used in
> a
> > differently named metric, which we would describe in the JavaDoc.
> > Could that also be a solution to the currentEventFetchTimeLag metric?
> >
> > On Tue, Aug 3, 2021 at 12:54 PM Thomas Weise  wrote:
> >
> > > +1 (binding)
> > >
> > > On Tue, Aug 3, 2021 at 12:58 AM Chesnay Schepler 
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Although I still think all the set* methods should accept a Supplier
> > > > instead of a Gauge.
> > > >
> > > > On 02/08/2021 12:36, Becket Qin wrote:
> > > > > +1 (binding).
> > > > >
> > > > > Thanks for driving the efforts, Arvid.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Sat, Jul 31, 2021 at 12:08 PM Steven Wu 
> > > wrote:
> > > > >
> > > > >> +1 (non-binding)
> > > > >>
> > > > >> On Fri, Jul 30, 2021 at 3:55 AM Arvid Heise 
> > wrote:
> > > > >>
> > > > >>> Dear devs,
> > > > >>>
> > > > >>> I'd like to open a vote on FLIP-179: Expose Standardized Operator
> > > > Metrics
> > > > >>> [1] which was discussed in this thread [2].
> > > > >>> The vote will be open for at least 72 hours unless there is an
> > > > objection
> > > > >>> or not enough votes.
> > > > >>>
> > > > >>> The proposal excludes the implementation for the
> > > > currentFetchEventTimeLag
> > > > >>> metric, which caused a bit of discussion without a clear
> > convergence.
> > > > We
> > > > >>> will implement that metric in a generic way at a later point and
> > > > >> encourage
> > > > >>> sources to implement it themselves in the meantime.
> > > > >>>
> > > > >>> Best,
> > > > >>>
> > > > >>> Arvid
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > > > >>> [2]
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r856920cbfe6a262b521109c5bdb9e904e00a9b3f1825901759c24d85%40%3Cdev.flink.apache.org%3E
> > > >
> > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-23617) Co-locate sink operators with same parallelism

2021-08-04 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-23617:
---

 Summary: Co-locate sink operators with same parallelism
 Key: FLINK-23617
 URL: https://issues.apache.org/jira/browse/FLINK-23617
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.14.0
Reporter: Arvid Heise


FLINK-19531 introduced the implementation of the Sink interface. It strictly 
cut the different parts of the sink pipeline into 3 operators:

writer -> committer -> global committer

In streaming mode with a parallelism p, the pipeline is executed as follows
writer(parallelism=p) -> committer(parallelism=p) -> global 
committer(parallelism=1).
Here we could bundle writer+committer into one operator.

In batch mode with a parallelism p, the pipeline is executed as follows
writer(parallelism=p) -> committer(parallelism=1) -> global 
committer(parallelism=1).
Here we could bundle committer+global committer into one operator. (Committer 
needs to run with parallelism=1 to create a pipeline region and reduce the risk 
of dataloss during commit; we can hopefully fix it after FLIP-147)

Having fewer operators will decrease the need to copy the committables in the 
operator chain (where we currently mostly use Kryo). Thus, we can implement 
connection/transaction pooling in streaming, where committables are reused 
after successful commit.

The proposal of this ticket is to extract the functionality of the 7 different 
operator implementations with their factories into reusable building blocks and 
use them in 2 operators (writer+committer and committer+global committer).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-179: Expose Standardized Operator Metrics

2021-08-03 Thread Arvid Heise
@Becket Qin  @Thomas Weise  would you
also agree to @Chesnay Schepler  's proposal?

I think the main intention is to only use a Gauge when the exact metric is
exposed. For any partial value that may be used in an internal/predefined
metric, we would only use a supplier instead of a Gauge.

So a connector developer can immediately distinguish the cases: if it's a
metric class he would see the exact metric corresponding to the setter. If
it's some Supplier, the developer would expect that the value is used in a
differently named metric, which we would describe in the JavaDoc.
Could that also be a solution to the currentEventFetchTimeLag metric?

On Tue, Aug 3, 2021 at 12:54 PM Thomas Weise  wrote:

> +1 (binding)
>
> On Tue, Aug 3, 2021 at 12:58 AM Chesnay Schepler 
> wrote:
>
> > +1 (binding)
> >
> > Although I still think all the set* methods should accept a Supplier
> > instead of a Gauge.
> >
> > On 02/08/2021 12:36, Becket Qin wrote:
> > > +1 (binding).
> > >
> > > Thanks for driving the efforts, Arvid.
> > >
> > > Cheers,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Sat, Jul 31, 2021 at 12:08 PM Steven Wu 
> wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> On Fri, Jul 30, 2021 at 3:55 AM Arvid Heise  wrote:
> > >>
> > >>> Dear devs,
> > >>>
> > >>> I'd like to open a vote on FLIP-179: Expose Standardized Operator
> > Metrics
> > >>> [1] which was discussed in this thread [2].
> > >>> The vote will be open for at least 72 hours unless there is an
> > objection
> > >>> or not enough votes.
> > >>>
> > >>> The proposal excludes the implementation for the
> > currentFetchEventTimeLag
> > >>> metric, which caused a bit of discussion without a clear convergence.
> > We
> > >>> will implement that metric in a generic way at a later point and
> > >> encourage
> > >>> sources to implement it themselves in the meantime.
> > >>>
> > >>> Best,
> > >>>
> > >>> Arvid
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > >>> [2]
> > >>>
> > >>>
> > >>
> >
> https://lists.apache.org/thread.html/r856920cbfe6a262b521109c5bdb9e904e00a9b3f1825901759c24d85%40%3Cdev.flink.apache.org%3E
> >
> >
> >
>


Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

2021-07-30 Thread Arvid Heise
Hi Martijn,

1. Good question. The watermarks and statuses of the splits are first
aggregated before emitted through the reader. The watermark strategy of the
user is actually applied on all SourceOutputs (=splits). Since one split is
active and one is idle, the watermark of the reader will not advance until
the user-defined idleness is triggered on the idle split. At this point,
the combined watermark solely depends on the active split. The combined
status remains ACTIVE.
2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
side. In fact, if you search for Kafka and partition discovery, you will
only find Flink resources. What we actually do is dynamic topic discovery
and that can only be triggered through pattern afaik. We could go for topic
discovery on all patterns by default if we don't do that already.
3. Yes, idleness on assigned partitions would even work with dynamic
assignments. I will update the FLIP to reflect that.
4. Afaik it was only meant for scenario 2 (and your question 3) and it
should be this way after the FLIP. I don't know of any source
implementation that uses the user-specified idleness to handle scenario 3.
The thing that is currently extra is that some readers go idle, when the
reader doesn't have an active assignment.

Best,

Arvid

On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser 
wrote:

> Hi all,
>
> I have a couple of questions after studying the FLIP and the docs:
>
> 1. What happens when one of the readers has two splits assigned and one of
> the splits actually receives data?
>
> 2. If I understand it correctly the Kinesis Source uses dynamic shard
> discovery by default (so in case of idleness scenario 3 would happen there)
> and the FileSource also has a dynamic assignment. The Kafka Source doesn't
> use dynamic partition discovery by default (so scenario 2 would be the
> default to happen there). Why did we choose to not enable dynamic partition
> discovery by default and should we actually change that?
>
> 3. To be sure, is it correct that in case of a dynamic assignment and there
> is temporarily no data, that scenario 2 is applicable?
>
> 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
> the one from my 3rd question? (edited)
>
> Best regards,
>
> Martijn
>
> On Fri, 23 Jul 2021 at 15:57, Till Rohrmann  wrote:
>
> > Hi everyone,
> >
> > I would be in favour of what Arvid said about not exposing the
> > WatermarkStatus to the Sink. Unless there is a very strong argument that
> > this is required I think that keeping this concept internal seems to me
> the
> > better choice right now. Moreover, as Arvid said the downstream
> application
> > can derive the WatermarkStatus on their own depending on its business
> > logic.
> >
> > Cheers,
> > Till
> >
> > On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise  wrote:
> >
> > > Hi Eron,
> > >
> > > thank you very much for your feedback.
> > >
> > > Please mention that the "temporary status toggle" code will be removed.
> > > >
> > > This code is already removed but there is still some automation of
> going
> > > idle when temporary no splits are assigned. I will include it in the
> > FLIP.
> > >
> > > I agree with adding the markActive() functionality, for symmetry.
> > Speaking
> > > > of symmetry, could we now include the minor enhancement we discussed
> in
> > > > FLIP-167, the exposure of watermark status changes on the Sink
> > interface.
> > > > I drafted a PR and would be happy to revisit it.
> > > >
> > > >
> > >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> > >
> > > I'm still not sure if that's a good idea.
> > >
> > > If we have now refined idleness to be an user-specified,
> > > application-specific way to handle with temporarily stalled partitions,
> > > then downstream applications should actually implement their own
> idleness
> > > definition. Let's see what other devs think. I'm pinging the once that
> > have
> > > been most involved in the discussion: @Stephan Ewen 
> > > @Till
> > > Rohrmann  @Dawid Wysakowicz <
> > dwysakow...@apache.org>
> > > .
> > >
> > > The flip mentions a 'watermarkstatus' package for the WatermarkStatus
> > > > class.  Should it be 'eventtime' package?
> > > >
> > > Are you proposing org.apache.flink.api.common.eventtime? I was simply
> > > suggesting to simply ren

Re: [DISCUSS] FLIP-177: Extend Sink API

2021-07-30 Thread Arvid Heise
Hi Guowei, hi all,

The main drawback of the AsyncIO approach is the decreased flexibility. In
particular, as you mentioned for the advanced backpressure use cases, you
would need to chain several AsyncIOs:

>>>But whether a sink is overloaded not only depends on the queue size. It
> also depends on the number of in-flight async requests
> 1. How about chaining two AsyncIOs? One is for controlling the size of the
> buffer elements; The other is for controlling the in-flight async requests.
>

If we need an AsyncIO for each dimension of backpressure, we also might end
up with an incompatible state when a dimension is added or removed through
a configuration change.

With that being said, I'd like to start a vote on the proposal as your
strong objection disappeared. We can continue the discussion here but I'd
also appreciate any vote on [1].

[1]
https://lists.apache.org/thread.html/r7194846ec671e9e0e64908a7ae4cf32c2bccf1dd6ee7db107a52cf04%40%3Cdev.flink.apache.org%3E

On Fri, Jul 30, 2021 at 5:51 AM Guowei Ma  wrote:

> Hi, Arvid & Piotr
> Sorry for the late reply.
> 1. Thank you all very much for your patience and explanation. Recently, I
> have also studied the related code of 'MailBox', which may not be as
> serious as I thought, after all, it is very similar to Java's `Executor`;
> 2. Whether to use AsyncIO or MailBox to implement Kinesis connector is more
> up to the contributor to decide (after all, `Mailbox` has decided to be
> exposed :-) ). It’s just that I personally prefer to combine some simple
> functions to complete a more advanced function.
> Best,
> Guowei
>
>
> On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise  wrote:
>
> > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
> > Executor [1] with named lambdas, except for the name MailboxExecutor
> > nothing is hinting at a specific threading model.
> >
> > Currently, we expose it on StreamOperator API. Afaik the idea is to make
> > the StreamOperator internal and beef up ProcessFunction but for several
> use
> > cases (e.g., AsyncIO), we actually need to expose the executor anyways.
> >
> > We could rename MailboxExecutor to avoid exposing the name of the
> threading
> > model. For example, we could rename it to TaskThreadExecutor (but that's
> > pretty much the same), to CooperativeExecutor (again implies Mailbox), to
> > o.a.f.Executor, to DeferredExecutor... Ideas are welcome.
> >
> > We could also simply use Java's Executor interface, however, when working
> > with that interface, I found that the missing context of async executed
> > lambdas made debugging much much harder. So that's why I designed
> > MailboxExecutor to force the user to give some debug string to each
> > invokation. In the sink context, we could, however, use an adaptor from
> > MailboxExecutor to Java's Executor and simply bind the sink name to the
> > invokations.
> >
> > [1]
> >
> >
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
> >
> > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Regarding the question whether to expose the MailboxExecutor or not:
> > > 1. We have plans on exposing it in the ProcessFunction (in short we
> want
> > to
> > > make StreamOperator API private/internal only, and move all of it's
> extra
> > > functionality in one way or another to the ProcessFunction). I don't
> > > remember and I'm not sure if *Dawid* had a different idea about this
> (do
> > > not expose Mailbox but wrap it somehow?)
> > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure
> how
> > > helpful it will be for keeping backward compatibility in the future.
> > > `MailboxExecutor` is already a very generic interface that doesn't
> expose
> > > much about the current threading model. Note that the previous
> threading
> > > model (multi threaded with checkpoint lock), should be easy to
> implement
> > > using the `MailboxExecutor` interface (use a direct executor that
> > acquires
> > > checkpoint lock).
> > >
> > > Having said that, I haven't spent too much time thinking about whether
> > it's
> > > better to enrich AsyncIO or provide the AsyncSink. If we can just as
> > > efficiently provide the same functionality using the existing/enhanced
> > > AsyncIO API, that may be a good idea if it indeed reduces our
> > > maintenance costs.
> > >
> > > Piotrek
> > >
> > > pt., 23 lip 2021 o 12:55 Guowei Ma  napisał(a):
> > >
> > > > Hi, Arvid
> 

[VOTE] FLIP-177: Extend Sink API

2021-07-30 Thread Arvid Heise
Hi all,

I'd like to start a vote on FLIP-177: Extend Sink API [1] which provides
small extensions to the Sink API introduced through FLIP-143.
The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

Note that the FLIP was larger initially and I cut down all
advanced/breaking changes.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API


Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-30 Thread Arvid Heise
Hi everyone,

I started the voting thread [1]. Please cast your vote there or ask
additional questions here.

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r70d321b6aa62ab4e31c8b73552b2de7846c4d31ed6f08d6541a9b36e%40%3Cdev.flink.apache.org%3E

On Fri, Jul 30, 2021 at 10:46 AM Becket Qin  wrote:

> Hi Arvid,
>
> I think it is OK to leave eventTimeFetchLag out of the scope of this FLIP
> given that it may involve additional API changes.
>
> 5. RecordMetadata is currently not simplifying any code. By the current
> > design RecordMetadata is a read-only data structure that is constant for
> > all records in a batch. So in Kafka, we still need to pass Tuple3 because
> > offset and timestamp are per record.
>
> Does this depend on whether we will get the RecordMetadata per record or
> per batch? We can make the semantic of RecordsWithSplitIds.metadata() to be
> the metadata associated with the last record returned by
> RecordsWithSplitIds.nextRecordsFromSplit(). In this case, individual
> implementations can decide whether to return different metadata for each
> record or not. In case of Kafka, the Tuple3 can be replaced with three
> lists of records, timestamps and offsets respectively. It probably saves
> some object instantiation, assuming the RecordMetadata object itself can be
> reused.
>
> 6. We might rename and change the semantics into
>
> public interface RecordsWithSplitIds {
> > /**
> >  * Returns the record metadata. The metadata is shared for all
> > records in the current split.
> >  */
> > @Nullable
> > default RecordMetadata metadataOfCurrentSplit() {
> > return null;
> > }
> > ...
> > }
>
> Maybe we can move one step further to make it "metadataOfCurrentRecord()"
> as I mentioned above.
>
> Thanks,
>
> Jiangjie (Becket) QIn
>
> On Fri, Jul 30, 2021 at 3:00 PM Arvid Heise  wrote:
>
> > Hi folks,
> >
> > To move on with the FLIP, I will cut out eventTimeFetchLag out of scope
> and
> > go ahead with the remainder.
> >
> > I will open a VOTE later to today.
> >
> > Best,
> >
> > Arvid
> >
> > On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise  wrote:
> >
> > > Hi Becket,
> > >
> > > I have updated the PR according to your suggestion (note that this
> commit
> > > contains the removal of the previous approach) [1]. Here are my
> > > observations:
> > > 1. Adding the type of RecordMetadata to emitRecord would require adding
> > > another type parameter to RecordEmitter and SourceReaderBase. So I left
> > > that out for now as it would break things completely.
> > > 2. RecordEmitter implementations that want to pass it to SourceOutput
> > need
> > > to be changed in a boilerplate fashion. (passing the metadata to the
> > > SourceOutput)
> > > 3. RecordMetadata as an interface (as in the commit) probably requires
> > > boilerplate implementations in using sources as well.
> > > 4. SourceOutput would also require an additional collect
> > >
> > > default void collect(T record, RecordMetadata metadata) {
> > > collect(record, TimestampAssigner.NO_TIMESTAMP, metadata);
> > > }
> > >
> > > 5. RecordMetadata is currently not simplifying any code. By the current
> > > design RecordMetadata is a read-only data structure that is constant
> for
> > > all records in a batch. So in Kafka, we still need to pass Tuple3
> because
> > > offset and timestamp are per record.
> > > 6. RecordMetadata is currently the same for all splits in
> > > RecordsWithSplitIds.
> > >
> > > Some ideas for the above points:
> > > 3. We should accompy it with a default implementation to avoid the
> > trivial
> > > POJO implementations as the KafkaRecordMetadata of my commit. Can we
> skip
> > > the interface and just have RecordMetadata as a base class?
> > > 1.,2.,4. We could also set the metadata only once in an orthogonal
> method
> > > that need to be called before collect like
> > SourceOutput#setRecordMetadata.
> > > Then we can implement it entirely in SourceReaderBase without changing
> > any
> > > code. The clear downside is that it introduces some implicit state in
> > > SourceOutput (which we implement) and is harder to use in
> > > non-SourceReaderBase classes: Source devs need to remember to call
> > > setRecordMetadata before collect for a respective record.
> > > 6. We might rename and change the semantics into
> > >
> > > public i

[VOTE] FLIP-179: Expose Standardized Operator Metrics

2021-07-30 Thread Arvid Heise
Dear devs,

I'd like to open a vote on FLIP-179: Expose Standardized Operator Metrics
[1] which was discussed in this thread [2].
The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

The proposal excludes the implementation for the currentFetchEventTimeLag
metric, which caused a bit of discussion without a clear convergence. We
will implement that metric in a generic way at a later point and encourage
sources to implement it themselves in the meantime.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
[2]
https://lists.apache.org/thread.html/r856920cbfe6a262b521109c5bdb9e904e00a9b3f1825901759c24d85%40%3Cdev.flink.apache.org%3E


Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-30 Thread Arvid Heise
Hi folks,

To move on with the FLIP, I will cut out eventTimeFetchLag out of scope and
go ahead with the remainder.

I will open a VOTE later to today.

Best,

Arvid

On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise  wrote:

> Hi Becket,
>
> I have updated the PR according to your suggestion (note that this commit
> contains the removal of the previous approach) [1]. Here are my
> observations:
> 1. Adding the type of RecordMetadata to emitRecord would require adding
> another type parameter to RecordEmitter and SourceReaderBase. So I left
> that out for now as it would break things completely.
> 2. RecordEmitter implementations that want to pass it to SourceOutput need
> to be changed in a boilerplate fashion. (passing the metadata to the
> SourceOutput)
> 3. RecordMetadata as an interface (as in the commit) probably requires
> boilerplate implementations in using sources as well.
> 4. SourceOutput would also require an additional collect
>
> default void collect(T record, RecordMetadata metadata) {
> collect(record, TimestampAssigner.NO_TIMESTAMP, metadata);
> }
>
> 5. RecordMetadata is currently not simplifying any code. By the current
> design RecordMetadata is a read-only data structure that is constant for
> all records in a batch. So in Kafka, we still need to pass Tuple3 because
> offset and timestamp are per record.
> 6. RecordMetadata is currently the same for all splits in
> RecordsWithSplitIds.
>
> Some ideas for the above points:
> 3. We should accompy it with a default implementation to avoid the trivial
> POJO implementations as the KafkaRecordMetadata of my commit. Can we skip
> the interface and just have RecordMetadata as a base class?
> 1.,2.,4. We could also set the metadata only once in an orthogonal method
> that need to be called before collect like SourceOutput#setRecordMetadata.
> Then we can implement it entirely in SourceReaderBase without changing any
> code. The clear downside is that it introduces some implicit state in
> SourceOutput (which we implement) and is harder to use in
> non-SourceReaderBase classes: Source devs need to remember to call
> setRecordMetadata before collect for a respective record.
> 6. We might rename and change the semantics into
>
> public interface RecordsWithSplitIds {
> /**
>  * Returns the record metadata. The metadata is shared for all records in 
> the current split.
>  */
> @Nullable
> default RecordMetadata metadataOfCurrentSplit() {
> return null;
> }
> ...
> }
>
>
> Re global variable
>
>> To explain a bit more on the metric being a global variable, I think in
>> general there are two ways to pass a value from one code block to another.
>> The first way is direct passing. That means the variable is explicitly
>> passed from one code block to another via arguments, be them in the
>> constructor or methods. Another way is indirect passing through context,
>> that means the information is stored in some kind of context or
>> environment, and everyone can have access to it. And there is no explicit
>> value passing from one code block to another because everyone just reads
>> from/writes to the context or environment. This is basically the "global
>> variable" pattern I am talking about.
>>
>> In general people would avoid having a mutable global value shared across
>> code blocks, because it is usually less deterministic and therefore more
>> difficult to understand or debug.
>>
> Since the first approach was using a Gauge, it's a callback and not a
> global value. The actual value is passed when invoking the callback. It's
> the same as a supplier. However, the gauge itself is stored in the context,
> so your argument holds on that level.
>
>
>> Moreover, generally speaking, the Metrics in systems are usually perceived
>> as a reporting mechanism. People usually think of it as a way to expose
>> some internal values to the external system, and don't expect the program
>> itself to read the reported values again in the main logic, which is
>> essentially using the MetricGroup as a context to pass values across code
>> block, i.e. the "global variable" pattern. Instead, people would usually
>> use the "direct passing" to do this.
>>
> Here I still don't see a difference on how we calculate the meter values
> from the byteIn/Out counters. We also need to read the counters
> periodically and calculate a secondary metric. So it can't be that
> unexpected to users.
>
> [1]
> https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b
>
> On Tue, Jul 27, 2021 at 3:19 AM Becket Qin  wrote:
>
>> Hi Arvid,
>>
>>

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-28 Thread Arvid Heise
om/writes to the context or environment. This is basically the "global
> variable" pattern I am talking about.
>
> In general people would avoid having a mutable global value shared across
> code blocks, because it is usually less deterministic and therefore more
> difficult to understand or debug.
>
> Moreover, generally speaking, the Metrics in systems are usually perceived
> as a reporting mechanism. People usually think of it as a way to expose
> some internal values to the external system, and don't expect the program
> itself to read the reported values again in the main logic, which is
> essentially using the MetricGroup as a context to pass values across code
> block, i.e. the "global variable" pattern. Instead, people would usually
> use the "direct passing" to do this.
>
> >Can we think of other use cases for the fetchTime parameter beyond metrics
> in the future? If so, it would make an even stronger case.
> At this point, I cannot think of other use cases for fetchTime, but I can
> see use cases where people want to get a per split fetch lag. So I am
> wondering if it makes sense to generalize the API a little bit by
> introducing collect(T Record, Long timestamp, Metadata metadata). This also
> makes a natural alignment because the RecordsWithSplitIds also returns a
> Metadata associated with the record, which can be used by RecordEmitter as
> well as the SourceOutput.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise  wrote:
>
> > Hi Becket,
> >
> > I still can't follow your view on the metric being a global variable or
> > your concern that it is confusing to users. Nevertheless, I like your
> > proposal with having an additional collect method.
> >
> > I was thinking that
> > > SourceOutput is going to have an additional method of collect(T Record,
> > > Long timestamp, Long fetchTime). So people can just pass in the fetch
> > time
> > > directly when they emit a record, regardless of using SourceReaderBase
> or
> > > not.
> > >
> >
> > Can we think of other use cases for the fetchTime parameter beyond
> metrics
> > in the future? If so, it would make an even stronger case.
> >
> > I'll update the PR with your proposals.
> >
> > Best,
> >
> > Arvid
> >
> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin 
> wrote:
> >
> > > Regarding the generic type v.s. class/subclasses of Metadata.
> > >
> > > I think generic types usually make sense if the framework/abstract
> class
> > > itself does not look into the instances, but just pass them from one
> user
> > > logic to another. Otherwise, interfaces or class/subclasses would be
> > > preferred.
> > >
> > > In our case, it depends on whether we expect the SourceReaderBase to
> look
> > > into the MetaData. At this point, it does not. But it seems possible
> that
> > > in the future it may look into MetaData. Therefore I think the class /
> > > subclass pattern would be better.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin 
> wrote:
> > >
> > > > Hi Arvid,
> > > >
> > > > > I'm not sure if I follow the global variable argument, could you
> > > > elaborate? Are you referring specifically to the SettableGauge? How
> is
> > > that
> > > > different from a Counter or Meter?
> > > > What I meant is that the fetch lag computing logic can either get the
> > > > information required from method argument or something like a static
> > > global
> > > > variable. We are essentially trying to reuse the metric as a static
> > > global
> > > > variable. It seems not a common pattern in most systems. It is a
> little
> > > > counterintuitive that a gauge reported to the metric system would be
> > used
> > > > by the program main logic later on as a variable.
> > > >
> > > > > We could do that. That would remove the gauge from the MetricGroup,
> > > > right? The main downside is that sources that do not use
> > SourceReaderBase
> > > > cannot set the metric anymore. So I'd rather keep the current way and
> > > > extend it with the metadata extension.
> > > > Yes, that would remove the gauge from the MetricGroup. I was thinking
> > > that
> > > > SourceOutput is going to have an add

Re: [DISCUSS] FLIP-177: Extend Sink API

2021-07-24 Thread Arvid Heise
Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
Executor [1] with named lambdas, except for the name MailboxExecutor
nothing is hinting at a specific threading model.

Currently, we expose it on StreamOperator API. Afaik the idea is to make
the StreamOperator internal and beef up ProcessFunction but for several use
cases (e.g., AsyncIO), we actually need to expose the executor anyways.

We could rename MailboxExecutor to avoid exposing the name of the threading
model. For example, we could rename it to TaskThreadExecutor (but that's
pretty much the same), to CooperativeExecutor (again implies Mailbox), to
o.a.f.Executor, to DeferredExecutor... Ideas are welcome.

We could also simply use Java's Executor interface, however, when working
with that interface, I found that the missing context of async executed
lambdas made debugging much much harder. So that's why I designed
MailboxExecutor to force the user to give some debug string to each
invokation. In the sink context, we could, however, use an adaptor from
MailboxExecutor to Java's Executor and simply bind the sink name to the
invokations.

[1]
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html

On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski  wrote:

> Hi,
>
> Regarding the question whether to expose the MailboxExecutor or not:
> 1. We have plans on exposing it in the ProcessFunction (in short we want to
> make StreamOperator API private/internal only, and move all of it's extra
> functionality in one way or another to the ProcessFunction). I don't
> remember and I'm not sure if *Dawid* had a different idea about this (do
> not expose Mailbox but wrap it somehow?)
> 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how
> helpful it will be for keeping backward compatibility in the future.
> `MailboxExecutor` is already a very generic interface that doesn't expose
> much about the current threading model. Note that the previous threading
> model (multi threaded with checkpoint lock), should be easy to implement
> using the `MailboxExecutor` interface (use a direct executor that acquires
> checkpoint lock).
>
> Having said that, I haven't spent too much time thinking about whether it's
> better to enrich AsyncIO or provide the AsyncSink. If we can just as
> efficiently provide the same functionality using the existing/enhanced
> AsyncIO API, that may be a good idea if it indeed reduces our
> maintenance costs.
>
> Piotrek
>
> pt., 23 lip 2021 o 12:55 Guowei Ma  napisał(a):
>
> > Hi, Arvid
> >
> > >>>The main question here is what do you think is the harm of exposing
> > Mailbox? Is it the complexity or the maintenance overhead?
> >
> > I think that exposing the internal threading model might be risky. In
> case
> > the threading model changes, it will affect the user's api and bring the
> > burden of internal modification. (Of course, you may have more say in how
> > the MailBox model will develop in the future) Therefore, I think that if
> an
> > alternative solution can be found, these possible risks will be avoided:
> > for example, through AsyncIO.
> >
> > >>>>AsyncIO has no user state, so we would be quite limited in
> implementing
> > at-least-once sinks especially when it comes to retries. Chaining two
> > AsyncIO would make it even harder to reason about the built-in state. We
> > would also give up any chance to implement exactly once async sinks (even
> > though I'm not sure if it's possible at all).
> >
> > 1. Why would we need to use the state when retrying(maybe I miss
> > something)? If a batch of asynchronous requests fails, I think it is
> enough
> > to retry directly in the callback. Or extend AsyncIO to give it the
> ability
> > to retry(XXXFuture.fail (Excelption)); in addition, in general, the
> client
> > has its own retry mechanism, at least the producer of Kineses said in the
> > document. Of course I am not opposed to retrying, I just want to find a
> > more obvious example to support the need to do so.
> >
> > 2. I don't think using AsyncIO will prevent exactly once in the future.
> > Both solutions need to be rewritten unless Exactly Once is required from
> > the beginning.
> >
> > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to
> the
> > expected stream.sinkTo. We have seen that on other occasions already
> (e.g.,
> > reinterpretAsKeyedStream).
> > In fact, I think this is the most important issue. We lack the function
> of
> > supporting sub-topology at the API layer, which is very inconvenient. For
> > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
> > ```java
> 

Re: [VOTE] FLIP-182: Support watermark alignment of FLIP-27 Sources

2021-07-23 Thread Arvid Heise
+1 (binding)

On Wed, Jul 21, 2021 at 1:55 PM Piotr Nowojski  wrote:

> Hi everyone,
>
> I would like to start a vote on the FLIP-182 [1] which was discussed in
> this
> thread [2].
> The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
>
> Best,
> Piotrek
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> [2]
>
> https://mail-archives.apache.org/mod_mbox/flink-dev/202107.mbox/%3CCABpD1RUHJURJg7Vkbq4Tjz2yecd8Wv8kJiT46M11F-ODSakgcw%40mail.gmail.com%3E
>


Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

2021-07-23 Thread Arvid Heise
he generic
> watermark generator (TimestampsAndWatermarksOperator).  Should the
> generator ignore the upstream idleness signal?  I believe it propagates the
> signal, even though it also generates its own signals.   Given that
> source-based and generic watermark generation shouldn't be combined, one
> could argue that the generic watermark generator should activate only when
> its input channel's watermark status is idle.
>
> Thanks again for this effort!
> -Eron
>
>
> On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise  wrote:
>
> > Dear devs,
> >
> > We recently discovered that StreamStatus and Idleness is insufficiently
> > defined [1], so I drafted a FLIP [3] to amend that situation. It would be
> > good to hear more opinions on that matter. Ideally, we can make the
> changes
> > to 1.14 as some newly added methods are affected.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> >
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
> > [2]
> >
> >
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >
>


Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-23 Thread Arvid Heise
Hi Becket,

I still can't follow your view on the metric being a global variable or
your concern that it is confusing to users. Nevertheless, I like your
proposal with having an additional collect method.

I was thinking that
> SourceOutput is going to have an additional method of collect(T Record,
> Long timestamp, Long fetchTime). So people can just pass in the fetch time
> directly when they emit a record, regardless of using SourceReaderBase or
> not.
>

Can we think of other use cases for the fetchTime parameter beyond metrics
in the future? If so, it would make an even stronger case.

I'll update the PR with your proposals.

Best,

Arvid

On Fri, Jul 23, 2021 at 12:08 PM Becket Qin  wrote:

> Regarding the generic type v.s. class/subclasses of Metadata.
>
> I think generic types usually make sense if the framework/abstract class
> itself does not look into the instances, but just pass them from one user
> logic to another. Otherwise, interfaces or class/subclasses would be
> preferred.
>
> In our case, it depends on whether we expect the SourceReaderBase to look
> into the MetaData. At this point, it does not. But it seems possible that
> in the future it may look into MetaData. Therefore I think the class /
> subclass pattern would be better.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jul 23, 2021 at 5:54 PM Becket Qin  wrote:
>
> > Hi Arvid,
> >
> > > I'm not sure if I follow the global variable argument, could you
> > elaborate? Are you referring specifically to the SettableGauge? How is
> that
> > different from a Counter or Meter?
> > What I meant is that the fetch lag computing logic can either get the
> > information required from method argument or something like a static
> global
> > variable. We are essentially trying to reuse the metric as a static
> global
> > variable. It seems not a common pattern in most systems. It is a little
> > counterintuitive that a gauge reported to the metric system would be used
> > by the program main logic later on as a variable.
> >
> > > We could do that. That would remove the gauge from the MetricGroup,
> > right? The main downside is that sources that do not use SourceReaderBase
> > cannot set the metric anymore. So I'd rather keep the current way and
> > extend it with the metadata extension.
> > Yes, that would remove the gauge from the MetricGroup. I was thinking
> that
> > SourceOutput is going to have an additional method of collect(T Record,
> > Long timestamp, Long fetchTime). So people can just pass in the fetch
> time
> > directly when they emit a record, regardless of using SourceReaderBase or
> > not.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler 
> > wrote:
> >
> >> The only histogram implementation available to use are those by
> >> dropwizard, and they do some lock-free synchronization stuff that so
> far we
> >> wanted to keep out of hot paths (this applis to both reading and
> writing);
> >> we have however never made benchmarks.
> >> But it is reasonable to assume that they are way more expensive than the
> >> alternatives (in the ideal case just being a getter).
> >> You'd pay that cost irrespective of whether a reporter is enabled or
> not,
> >> which is another thing we so far wanted to prevent.
> >> Finally, histograms are problematic because they are 10x more expensive
> >> on the metric backend (because they are effectively 10 metrics), and
> should
> >> be used with extreme caution, in particular because we lack any switch
> to
> >> disable/enable metrics (and I think we are getting to a point where the
> >> metric system becomes unusable for heavy users because of that, where
> >> another histogram isn't helping).
> >>
> >> Overall, at this time I'm against using Histograms.
> >> Furthermore, I believe that we should be able to derive a Histogram from
> >> that supplier if we later one decide differently. We'd just need to poll
> >> the supplier more often.
> >>
> >> On 22/07/2021 09:36, Arvid Heise wrote:
> >>
> >> Hi all,
> >>
> >> @Steven Wu 
> >>
> >>> Regarding "lastFetchTime" latency metric, I found Gauge to be less
> >>> informative as it only captures the last sampling value for each metric
> >>> publish interval (e.g. 60s).
> >>> * Can we make it a histogram? Histograms are more expensive though.
> >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg, count. but
&g

Re: [DISCUSS] FLIP-177: Extend Sink API

2021-07-22 Thread Arvid Heise
w
>> too large either [1, 2]. We also need to support use cases where the
>> destination can only ingest x messages per second or a total throughput of
>> y per second. We are also planning to support time outs so that data is
>> persisted into the destination at least every n seconds by means of the
>> `ProcessingTimeService`. The batching configuration will be part of the
>> constructor, which has only been indicated in the current prototype but is
>> not implemented, yet [3].
>>
>> I’m not sure whether I’m understanding who you are referring to by user.
>> People who are using a concrete sink, eg, to send messages into a Kinesis
>> stream, will not be exposed to the `MailboxExecutor`. They are just using
>> the sink and pass in the batching configuration from above [4]. The
>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for sink
>> authors who want to create support for a new destination. I would expect
>> that there are only few experts who are adding support for new
>> destinations, who are capable to understand and use the advanced constructs
>> properly.
>>
>> Hope that helps to clarify our thinking.
>>
>> Cheers, Steffen
>>
>>
>> [1]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
>> [2]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
>> [3]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
>> [4]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
>>
>>
>>
>> From: Arvid Heise 
>> Date: Tuesday, 20. July 2021 at 23:03
>> To: dev , Steffen Hausmann 
>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
>>
>>
>> CAUTION: This email originated from outside of the organization. Do not
>> click links or open attachments unless you can confirm the sender and know
>> the content is safe.
>>
>>
>> Hi Guowei,
>>
>> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
>> implement FLIP-171 based on public interfaces (which would require exposing
>> MailboxExecutor as described here in FLIP-177) or if it's better to
>> implement it internally and hide it.
>> The first option is an abstract base class; your second option would be
>> an abstract interface that has matching implementation internally
>> (similarly to AsyncIO).
>> There is an example for option 1 in [2]; I think the idea was to
>> additionally specify the batch size and batch timeout in the ctor.
>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
>>
>> 2. I guess your question is if current AsyncIO is not sufficient already
>> if exactly-once is not needed? The answer is currently no, because AsyncIO
>> is not doing any batching. The user could do batching before that but
>> that's quite a bit of code. However, we should really think if AsyncIO
>> should also support batching.
>> I would also say that the scope of AsyncIO and AsyncSink is quite
>> different: the first one is for application developers and the second one
>> is for connector developers and would be deemed an implementation detail by
>> the application developer. Of course, advanced users may fall in both
>> categories, so the distinction does not always hold.
>>
>> Nevertheless, there is some overlap between both approaches and it's
>> important to think if the added complexity warrants the benefit. It would
>> be interesting to hear how other devs see that.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>> [2]
>> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>>
>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma > guowei@gmail.com>> wrote:
>> Hi, Avrid
>> Thank you Avrid for perfecting Sink through this FLIP. I have two little
>> questions
>>
>> 1. What do you think of us directly providing an interface as follows? In
>> this way, there 

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-22 Thread Arvid Heise
Hi all,

@Steven Wu 

> Regarding "lastFetchTime" latency metric, I found Gauge to be less
> informative as it only captures the last sampling value for each metric
> publish interval (e.g. 60s).
> * Can we make it a histogram? Histograms are more expensive though.
> * Timer [1, 2] is cheaper as it just tracks min, max, avg, count. but there
> is no such metric type in Flink
> * Summary metric type [3] (from Prometheus) would be nice too
>
I'd also think that a histogram is much more expressive but the original
FLIP-33 decided against it because of it's cost. @Chesnay Schepler
 could you shed some light on how much more expensive
it is in comparison to a simple gauge? Does it depend on whether a reporter
is actually using the metric?
The current interface of this FLIP-179 would actually allow to switch the
type of the metric later. But since the metric type is user-facing, we need
to have an agreement now.

@Becket Qin 

> In that case, do we still need the metric here? It seems we are creating a
> "global variable" which users may potentially use. I am wondering how much
> additional convenience it provides because it seems easy for people to
> simply pass the fetch time by themselves if they have decided to not use
> SourceReaderBase. Also, it looks like we do not have an API pattern that
> lets users get the value of a metric and derive another metric. So I think
> it is easier for people to understand if LastFetchTimeGauge() is just an
> independent metric by itself, instead of being a part of the
> eventTimeFetchLag computation.
>
I'm not sure if I follow the global variable argument, could you elaborate?
Are you referring specifically to the SettableGauge? How is that different
from a Counter or Meter?

With the current design, we could very well add a LastFetchTime metric. The
key point of the current abstraction is that a user gets the much harder
eventTimeFetchLag metric for free, since we already need to extract the
event time for other metrics. I think the JavaDoc makes it clear what the
intent of the LastFetchTimeGauge is and if not we can improve it.
Btw we have derived metrics already. For example, we have Meters for
byteIn/Out and recordIn/Out. That's already part of FLIP-33.

Would it make sense to have a more generic metadata type  associated
> with the records batch? In some cases, it may be useful to allow the Source
> implementation to carry some additional information of the batch to the
> RecordEmitter. For example, the split info of the batch, the sender of the
> batch etc. Because the RecordEmitter only takes one record at.a time,
> currently such information needs to be put into each record, which may
> involve a lot of wrapper object creation.
>
I like the idea of having more general metadata and I follow the example.
I'm wondering if we could avoid a generic type (since that adds a bit of
complexity to the mental model and usage) by simply encouraging to use a
more specific MetaData subclass as a return type of the method.

public interface RecordsWithSplitIds {
@Nullable
default RecordMetadata getMetadata() {
return null;
}
...
}

public interface RecordMetadata {
long getLastFetchTime(); // mandatory?
}

And using it as

public class KafkaRecordMetadata implements RecordMetadata {}

private static class KafkaPartitionSplitRecords implements
RecordsWithSplitIds {
@Override
public KafkaRecordMetadata getMetadata() {
return metadata;
}
}

Or do we want to have the generic to explicitly pass it to the
RecordEmitter? Would that metadata be a fourth parameter of
RecordEmitter#emitRecord?

It might be slightly better if we let the method accept a Supplier in this
> case. However, it seems to introduce a parallel channel or a sidepath
> between the user implementation and SourceOutput. I am not sure if this is
> the right way to go. Would it be more intuitive if we just add a new method
> to the SourceOutput, to allow the FetchTime to be passed in explicitly?
> This would work well with the change I suggested above, which adds a
> generic metadata type  to the RecordsWithSplits and passes that to the
> RecordEmitter.emitRecord() as an argument.
>

We could do that. That would remove the gauge from the MetricGroup, right?
The main downside is that sources that do not use SourceReaderBase cannot
set the metric anymore. So I'd rather keep the current way and extend it
with the metadata extension.

Best,

Arvid


On Wed, Jul 21, 2021 at 1:38 PM Becket Qin  wrote:

> Hey Chesnay,
>
> I think I got what that method was designed for now. Basically the
> motivation is to let the SourceOutput to report the eventTimeFetchLag for
> users. At this point, the SourceOutput only has the EventTime, so this
> method provides a way for the users to pass the FetchTime to the
> SourceOutput. This is essentially a context associated with each record
> emitted to the SourceOutput.
>
> It might be slightly better if we let the method accept a Supplier in this

Re: [DISCUSS] FLIP-177: Extend Sink API

2021-07-20 Thread Arvid Heise
Hi Guowei,

1. your idea is quite similar to FLIP-171 [1]. The question is if we
implement FLIP-171 based on public interfaces (which would require exposing
MailboxExecutor as described here in FLIP-177) or if it's better to
implement it internally and hide it.
The first option is an abstract base class; your second option would be an
abstract interface that has matching implementation internally (similarly
to AsyncIO).
There is an example for option 1 in [2]; I think the idea was to
additionally specify the batch size and batch timeout in the ctor. @Hausmann,
Steffen  knows more.

2. I guess your question is if current AsyncIO is not sufficient already if
exactly-once is not needed? The answer is currently no, because AsyncIO is
not doing any batching. The user could do batching before that but that's
quite a bit of code. However, we should really think if AsyncIO should also
support batching.
I would also say that the scope of AsyncIO and AsyncSink is quite
different: the first one is for application developers and the second one
is for connector developers and would be deemed an implementation detail by
the application developer. Of course, advanced users may fall in both
categories, so the distinction does not always hold.

Nevertheless, there is some overlap between both approaches and it's
important to think if the added complexity warrants the benefit. It would
be interesting to hear how other devs see that.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
[2]
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java

On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma  wrote:

> Hi, Avrid
> Thank you Avrid for perfecting Sink through this FLIP. I have two little
> questions
>
> 1. What do you think of us directly providing an interface as follows? In
> this way, there may be no need to expose the Mailbox to the user. We can
> implement an `AsyncSinkWriterOperator` to control the length of the queue.
> If it is too long, do not call SinkWriter::write.
> public interface AsyncSinkWriter
> extends SinkWriter>, CommT,
> WriterStateT> {  please ignore the name of Tuple2 and XXXFuture at
> first.
> int getQueueLimit();
> }
>
> 2. Another question is: If users don't care about exactly once and the
> unification of stream and batch, how about letting users use
> `AsyncFunction` directly? I don’t have an answer either. I want to hear
> your suggestions.
>
> Best,
> Guowei
>
>
> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise  wrote:
>
> > Dear devs,
> >
> > today I'd like to start the discussion on the Sink API. I have drafted a
> > FLIP [1] with an accompanying PR [2].
> >
> > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in
> > one. In this discussion, we should decide on the scope and cut out too
> > invasive steps if we can't reach an agreement.
> >
> > Step 1 is to add a few more pieces of information to context objects.
> > That's non-breaking and needed for the async communication pattern in
> > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I
> > think that this should entail the least discussions.
> >
> > Step 2 is to also offer the same context information to committers. Here
> we
> > can offer some compatibility methods to not break existing sinks. The
> main
> > use case would be some async exactly-once sink but I'm not sure if we
> would
> > use async communication patterns at all here (or simply wait for all
> async
> > requests to finish in a sync way). It may also help with async cleanup
> > tasks though.
> >
> > While drafting Step 2, I noticed the big entanglement of the current API.
> > To figure out if there is a committer during the stream graph creation,
> we
> > actually need to create a committer which can have unforeseen
> consequences.
> > Thus, I spiked if we can disentangle the interface and have separate
> > interfaces for the different use cases. The resulting step 3 would be a
> > completely breaking change and thus is probably controversial. However,
> I'd
> > also see the disentanglement as a way to prepare to make Sinks more
> > expressive (write and commit coordinator) without completely overloading
> > the main interface.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > [2] https://github.com/apache/flink/pull/16399
> > [3]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >
>


[DISCUSS] FLIP-177: Extend Sink API

2021-07-19 Thread Arvid Heise
Dear devs,

today I'd like to start the discussion on the Sink API. I have drafted a
FLIP [1] with an accompanying PR [2].

This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in
one. In this discussion, we should decide on the scope and cut out too
invasive steps if we can't reach an agreement.

Step 1 is to add a few more pieces of information to context objects.
That's non-breaking and needed for the async communication pattern in
FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I
think that this should entail the least discussions.

Step 2 is to also offer the same context information to committers. Here we
can offer some compatibility methods to not break existing sinks. The main
use case would be some async exactly-once sink but I'm not sure if we would
use async communication patterns at all here (or simply wait for all async
requests to finish in a sync way). It may also help with async cleanup
tasks though.

While drafting Step 2, I noticed the big entanglement of the current API.
To figure out if there is a committer during the stream graph creation, we
actually need to create a committer which can have unforeseen consequences.
Thus, I spiked if we can disentangle the interface and have separate
interfaces for the different use cases. The resulting step 3 would be a
completely breaking change and thus is probably controversial. However, I'd
also see the disentanglement as a way to prepare to make Sinks more
expressive (write and commit coordinator) without completely overloading
the main interface.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
[2] https://github.com/apache/flink/pull/16399
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink


Re: [DISCUSS] FLIP-171: Async Sink

2021-07-19 Thread Arvid Heise
Hi Guowei,

That's a good question. FLIP-171 was mainly motivated by at-least-once
sinks (Kinesis, DynamoDB, Timestream, Firehose). The idea is to expand that
interface in the future to also support exactly-once.

I'm currently drafting FLIP-177 (the FLIP that enables async pattern in the
first place) in a way that it would also support having an async pattern in
the committer. I'm just not sure if an async pattern and exactly once
really make sense. I'm opening the discussion soonish and ping you.


On Mon, Jul 19, 2021 at 5:28 AM Guowei Ma  wrote:

> Hi,
> I'm very sorry to participate in this discussion so late. But I have a
> little question. I understand the goal of this FLIP is to make `Writer`
> support asynchronous. But my question is: why not let `Committer` support
> asynchronization? If only `Writer` supports asynchronization, ExactlyOnce
> is impossible. Since I am not an expert in Kineses, I would like everyone
> to point out if I missed anything.
> Best,
> Guowei
>
>
> On Sat, Jul 17, 2021 at 12:27 AM Till Rohrmann 
> wrote:
>
>> Sure, thanks for the pointers.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen
>> 
>> wrote:
>>
>> > Hi Till,
>> >
>> > You are right, I’ve left out some implementation details, which have
>> > actually changed a couple of time as part of the ongoing discussion. You
>> > can find our current prototype here [1] and a sample implementation of
>> the
>> > KPL free Kinesis sink here [2].
>> >
>> > I plan to update the FLIP. But I think would it be make sense to wait
>> > until the implementation has stabilized enough before we update the
>> FLIP to
>> > the final state.
>> >
>> > Does that make sense?
>> >
>> > Cheers, Steffen
>> >
>> > [1]
>> >
>> https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink
>> > [2]
>> >
>> https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>> >
>> > From: Till Rohrmann 
>> > Date: Friday, 16. July 2021 at 18:10
>> > To: Piotr Nowojski 
>> > Cc: Steffen Hausmann , "dev@flink.apache.org" <
>> > dev@flink.apache.org>, Arvid Heise 
>> > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>> >
>> >
>> > CAUTION: This email originated from outside of the organization. Do not
>> > click links or open attachments unless you can confirm the sender and
>> know
>> > the content is safe.
>> >
>> >
>> > Hi Steffen,
>> >
>> > I've taken another look at the FLIP and I stumbled across a couple of
>> > inconsistencies. I think it is mainly because of the lacking code. For
>> > example, it is not fully clear to me based on the current FLIP how we
>> > ensure that there are no in-flight requests when
>> > AsyncSinkWriter.snapshotState is called. Also the concrete
>> implementation
>> > of the AsyncSinkCommitter could be helpful for understanding how the
>> > AsyncSinkWriter works in the end. Do you plan to update the FLIP
>> > accordingly?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski > > <mailto:pnowoj...@apache.org>> wrote:
>> > Thanks for addressing this issue :)
>> >
>> > Best, Piotrek
>> >
>> > wt., 29 cze 2021 o 17:58 Hausmann, Steffen > > shau...@amazon.de>> napisał(a):
>> > Hey Poitr,
>> >
>> > I've just adapted the FLIP and changed the signature for the
>> > `submitRequestEntries` method:
>> >
>> > protected abstract void submitRequestEntries(List
>> > requestEntries, ResultFuture requestResult);
>> >
>> > In addition, we are likely to use an AtomicLong to track the number of
>> > outstanding requests, as you have proposed in 2b). I've already
>> indicated
>> > this in the FLIP, but it's not fully fleshed out. But as you have said,
>> > that seems to be an implementation detail and the important part is the
>> > change of the `submitRequestEntries` signature.
>> >
>> > Thanks for your feedback!
>> >
>> > Cheers, Steffen
>> >
>> >
>> > On 25.06.21, 17:05, "Hausmann, Steffen" 
>> wrote:
>> >
>> > CAUTION: T

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-19 Thread Arvid Heise
Hi Steven,

I extended the FLIP and its draft PR to have a SourceReaderMetricGroup and
a SplitEnumeratorMetricGroup. I hope that it makes it clearer.
I'd like to address FLINK-21000 as part of the implementation but I'd keep
it out of the FLIP discussion.

Question: should we rename SinkMetricGroup to SinkWriterMetricGroup? I can
see the same confusion arising on sink side. I have added a commit to the
draft PR (not updated FLIP yet).

Btw I'd like to start the vote soonish. @Becket Qin 
are you okay with the setLastFetchTimeGauge explanation or do you have
alternative ideas?

Best,

Arvid

On Fri, Jul 16, 2021 at 8:13 PM Steven Wu  wrote:

> To avoid confusion, can we either rename "SourceMetricGroup" to "
> SplitReaderMetricGroup" or add "Reader" to the setter method names?
>
> Yes, we should  add the "unassigned/pending splits" enumerator metric. I
> tried to publish those metrics for IcebergSourceEnumerator and ran into an
> issue [1]. I don't want to distract the discussion with the jira ticket.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21000
>
> On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise  wrote:
>
> > Hi Steven,
> >
> > The semantics are unchanged compared to FLIP-33 [1] but I see your point.
> >
> > In reality, pending records would be mostly for event storage systems
> > (Kafka, Kinesis, ...). Here, we would report the consumer lag
> effectively.
> > If consumer lag is more prominent, we could also rename it.
> >
> > For pending bytes, this is mostly related to file source or any kind of
> > byte streams. At this point, we can only capture the assigned splits on
> > reader levels. I don't think it makes sense to add the same metric to the
> > enumerator as that might induce too much I/O on the job master. I could
> > rather envision another metric that captures how many unassigned splits
> > there are. In general, I think it would be a good idea to add another
> type
> > of top-level metric group for SplitEnumerator called
> > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add
> > unassigned/pending splits metric. WDYT?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu  wrote:
> >
> > > I am trying to understand what those two metrics really capture
> > >
> > > > G setPendingBytesGauge(G pendingBytesGauge);
> > >
> > >-  use file source as an example, it captures the remaining bytes
> for
> > >the current file split that the reader is processing? How would
> users
> > >interpret or use this metric? enumerator keeps tracks of the
> > >pending/unassigned splits, which is an indication of the size of the
> > >backlog. that would be very useful
> > >
> > >
> > > > G setPendingRecordsGauge(G
> pendingRecordsGauge);
> > >
> > >- In the Kafka source case, this is intended to capture the consumer
> > lag
> > >(log head offset from broker - current record offset)? that could be
> > > used
> > >to capture the size of the backlog
> > >
> > >
> > >
> > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise  wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > I believe 1+2 has been answered by Chesnay already. Just to add to 2:
> > I'm
> > > > not the biggest fan of reusing task metrics but that's what FLIP-33
> and
> > > > different folks suggested. I'd probably keep task I/O metrics only
> for
> > > > internal things and add a new metric for external calls. Then, we
> could
> > > > even allow users to track I/O in AsyncIO (which would currently be a
> > > mess).
> > > > However, with the current abstraction, it would be relatively easy to
> > add
> > > > separate metrics later.
> > > >
> > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the
> > > user
> > > > to implement it in a way that fetch time always corresponds to the
> > latest
> > > > polled record. For SourceReaderBase, I have added a new
> > > > RecordsWithSplitIds#lastFetchTime (with default return value null)
> that
> > > > sets the last fetch time automatically whenever the next batch is
> > > selected.
> > > > Tbh this metric is a bit more challenging to implement for
> > > > non-SourceReaderBase sources but I have not found a better,
> thread-safe
> > &g

Re: [DISCUSS] Watermark propagation with Sink API

2021-07-19 Thread Arvid Heise
Hi everyone,

I created a FLIP and started a discussion around that topic [1].

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E

On Tue, Jun 8, 2021 at 11:46 PM Eron Wright 
wrote:

> Thanks, the narrowed FLIP-167 is fine for now.  I'll re-activate the vote
> process.  Thanks!
>
> On Tue, Jun 8, 2021 at 3:01 AM Till Rohrmann  wrote:
>
> > Hi everyone,
> >
> > I do agree that Flink's definition of idleness is not fully thought
> through
> > yet. Consequently, I would feel a bit uneasy to make it part of Flink's
> API
> > right now. Instead, defining the proper semantics first and then exposing
> > it sounds like a good approach forward. Hence, +1 for option number 1,
> > which will also allow FLIP-167 to make progress.
> >
> > Concerning subtasks with no partitions assigned, would it make sense to
> > terminate these tasks at some point? That way, the stream would be closed
> > and there is no need to maintain a stream status. Of course, this also
> > requires at some point that Flink can start new sources when new
> partitions
> > appear.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Eron,
> > >
> > > The FLIP-167 is narrow, but we recently discovered some problems with
> > > current idleness semantics as Arvid explained. We are planning to
> > present a
> > > new proposal to redefine them. Probably as a part of it, we would need
> to
> > > rename them. Given that, I think it doesn't make sense to expose
> idleness
> > > to the sinks before we rename and define it properly. In other words:
> > >
> > > > 2. When the sink operator is idled, tell the sink function.
> > >
> > > We shouldn't expose stream status as a part of public API until it's
> > > properly defined.
> > >
> > > I would propose one of the two things:
> > > 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> > > Exposing idleness could be part of this next/future FLIP that would
> > define
> > > idleness in the first place.
> > > 2. Block FLIP-167, until the idleness is fixed.
> > >
> > > I would vote for option number 1.
> > >
> > > Piotrek
> > >
> > > pon., 7 cze 2021 o 18:08 Eron Wright 
> > > napisał(a):
> > >
> > > > Piotr, David, and Arvid, we've had an expansive discussion but
> > ultimately
> > > > the proposal is narrow.  It is:
> > > > 1. When a watermark arrives at the sink operator, tell the sink
> > function.
> > > > 2. When the sink operator is idled, tell the sink function.
> > > >
> > > > With these enhancements, we will significantly improve correctness in
> > > > multi-stage flows, and facilitate an exciting project in the Pulsar
> > > > community.  Would you please lend your support to FLIP-167 so that we
> > can
> > > > land this enhancement for 1.14?  My deepest thanks!
> > > >
> > > > -Eron
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:
> > > >
> > > > > Hi Eron,
> > > > >
> > > > > you either have very specific use cases in mind or have a
> > misconception
> > > > > about idleness in Flink with the new sources. The basic idea is
> that
> > > you
> > > > > have watermark generators only at the sources and the user supplies
> > > them.
> > > > > As a source author, you have no option to limit that. Here a bit of
> > > > > background:
> > > > >
> > > > > We observed that many users that read from Kafka were confused
> about
> > no
> > > > > visible progress in their Flink applications because of some idle
> > > > partition
> > > > > and we introduced idleness subsequently. Idleness was always
> > considered
> > > > as
> > > > > a means to achieve progress at the risk of losing a bit of
> > correctness.
> > > > > So especially in the case that you describe with a Pulsar partition
> > > that
> > > > is
> > > > > empty but indefinitely active, the user needs to be able to use
> > > idleness
> > > > > such that downstream window operators progress.
> > > > >
>

Re: [DISCUSS] Definition of idle partitions

2021-07-19 Thread Arvid Heise
some of the roots of this implementation [1],
> >> > initially
> >> > > the StreamStatus was actually defined to mark "watermark idleness",
> >> and
> >> > not
> >> > > "record idleness" (in fact, the alternative name "WatermarkStatus"
> was
> >> > > considered at the time).
> >> > >
> >> > > The concern at the time causing us to alter the definition to be
> >> "record
> >> > > idleness" in the final implementation was due to the existence of
> >> > periodic
> >> > > timestamp / watermark generators within the pipeline. Those would
> >> > continue
> >> > > to generate non-increasing watermarks in the absence of any input
> >> records
> >> > > from upstream. In this scenario, downstream operators would not be
> >> able
> >> > to
> >> > > consider that channel as idle and therefore watermark progress is
> >> locked.
> >> > > We could consider a timeout-based approach on those specific
> >> operators to
> >> > > toggle watermark idleness if the values remain constant for a period
> >> of
> >> > > time, but then again, this is very ill-defined and most likely
> wrong.
> >> > >
> >> > > I have not followed the newest changes to the watermark generator
> >> > > operators and am not sure if this issue is still relevant.
> >> > > Otherwise, I don't see other problems with changing the definition
> >> here.
> >> > >
> >> > > Thanks,
> >> > > Gordon
> >> > >
> >> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise 
> wrote:
> >> > >
> >> > >> Hi Eron,
> >> > >>
> >> > >> again to recap from the other thread:
> >> > >> - You are right that idleness is correct with static assignment and
> >> > fully
> >> > >> active partitions. In this case, the source defines idleness. (case
> >> A)
> >> > >> - For the more pressing use cases of idle, assigned partitions, the
> >> user
> >> > >> defines an idleness threshold, and it becomes potentially
> incorrect,
> >> > when
> >> > >> the partition becomes active again. (case B)
> >> > >> - Same holds for dynamic assignment of splits. If a source without
> a
> >> > split
> >> > >> gets a split assigned dynamically, there is a realistic chance that
> >> the
> >> > >> watermark advanced past the first record of the newly assigned
> split.
> >> > >> (case
> >> > >> C)
> >> > >> You can certainly insist that only the first case is valid (as it's
> >> > >> correct) but we know that users use it in other ways and that was
> >> also
> >> > the
> >> > >> intent of the devs.
> >> > >>
> >> > >> Now the question could be if it makes sense to distinguish these
> >> cases.
> >> > >> Would you treat the idleness information differently (especially in
> >> the
> >> > >> sink/source that motivated FLIP-167) if you knew that the idleness
> is
> >> > >> guaranteed correct?
> >> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A),
> >> TIMEOUT
> >> > >> (case B).
> >> > >>
> >> > >> However, that would still leave case C, which probably would need
> to
> >> be
> >> > >> solved completely differently. I could imagine that a source with
> >> > dynamic
> >> > >> assignments should never have IDLE subtasks and rather manage the
> >> > idleness
> >> > >> itself. For example, it could emit a watermark per second/minute
> >> that is
> >> > >> directly fetched from the source system. I'm just not sure if the
> >> > current
> >> > >> WatermarkAssigner interface suffices in that regard...
> >> > >>
> >> > >>
> >> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
> >> piotr.nowoj...@gmail.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Hi Eron,
> >> > >> >
> >> > >> > Can you elaborate a bit more what do you 

[DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

2021-07-19 Thread Arvid Heise
Dear devs,

We recently discovered that StreamStatus and Idleness is insufficiently
defined [1], so I drafted a FLIP [3] to amend that situation. It would be
good to hear more opinions on that matter. Ideally, we can make the changes
to 1.14 as some newly added methods are affected.

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition


Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-15 Thread Arvid Heise
Hi Steven,

The semantics are unchanged compared to FLIP-33 [1] but I see your point.

In reality, pending records would be mostly for event storage systems
(Kafka, Kinesis, ...). Here, we would report the consumer lag effectively.
If consumer lag is more prominent, we could also rename it.

For pending bytes, this is mostly related to file source or any kind of
byte streams. At this point, we can only capture the assigned splits on
reader levels. I don't think it makes sense to add the same metric to the
enumerator as that might induce too much I/O on the job master. I could
rather envision another metric that captures how many unassigned splits
there are. In general, I think it would be a good idea to add another type
of top-level metric group for SplitEnumerator called
SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add
unassigned/pending splits metric. WDYT?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

On Wed, Jul 14, 2021 at 9:00 AM Steven Wu  wrote:

> I am trying to understand what those two metrics really capture
>
> > G setPendingBytesGauge(G pendingBytesGauge);
>
>-  use file source as an example, it captures the remaining bytes for
>the current file split that the reader is processing? How would users
>interpret or use this metric? enumerator keeps tracks of the
>pending/unassigned splits, which is an indication of the size of the
>backlog. that would be very useful
>
>
> > G setPendingRecordsGauge(G pendingRecordsGauge);
>
>- In the Kafka source case, this is intended to capture the consumer lag
>(log head offset from broker - current record offset)? that could be
> used
>to capture the size of the backlog
>
>
>
> On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise  wrote:
>
> > Hi Becket,
> >
> > I believe 1+2 has been answered by Chesnay already. Just to add to 2: I'm
> > not the biggest fan of reusing task metrics but that's what FLIP-33 and
> > different folks suggested. I'd probably keep task I/O metrics only for
> > internal things and add a new metric for external calls. Then, we could
> > even allow users to track I/O in AsyncIO (which would currently be a
> mess).
> > However, with the current abstraction, it would be relatively easy to add
> > separate metrics later.
> >
> > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the
> user
> > to implement it in a way that fetch time always corresponds to the latest
> > polled record. For SourceReaderBase, I have added a new
> > RecordsWithSplitIds#lastFetchTime (with default return value null) that
> > sets the last fetch time automatically whenever the next batch is
> selected.
> > Tbh this metric is a bit more challenging to implement for
> > non-SourceReaderBase sources but I have not found a better, thread-safe
> > way. Of course, we could shift the complete calculation into user-land
> but
> > I'm not sure that this is easier.
> > For your scenarios:
> > - in A, you assume SourceReaderBase. In that case, we could eagerly
> report
> > the metric as sketched by you. It depends on the definition of "last
> > processed record" in FLIP-33, whether this eager reporting is more
> correct
> > than the lazy reporting that I have proposed. The former case assumes
> "last
> > processed record" = last fetched record, while the latter case assumes
> > "last processed record" = "last polled record". For the proposed
> solution,
> > the user would just need to implement RecordsWithSplitIds#lastFetchTime,
> > which typically corresponds to the creation time of the
> RecordsWithSplitIds
> > instance.
> > - B is not assuming SourceReaderBase.
> > If it's SourceReaderBase, the same proposed solution works out of the
> box:
> > SourceOperator intercepts the emitted event time and uses the fetch time
> of
> > the current batch.
> > If it's not SourceReaderBase, the user would need to attach the timestamp
> > to the handover protocol if multi-threaded and set the lastFetchTimeGauge
> > when a value in the handover protocol is selected (typically a batch).
> > If it's a single threaded source, the user could directly set the current
> > timestamp after fetching the records in a sync fashion.
> > The bad case is if the user is fetching individual records (either sync
> or
> > async), then the fetch time would be updated with every record. However,
> > I'm assuming that the required system call is dwarfed by involved I/O.
> >
> > [1] https://github.com/apache/flink/pull/15972
> >
> > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler 

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-13 Thread Arvid Heise
Hi Becket,

I believe 1+2 has been answered by Chesnay already. Just to add to 2: I'm
not the biggest fan of reusing task metrics but that's what FLIP-33 and
different folks suggested. I'd probably keep task I/O metrics only for
internal things and add a new metric for external calls. Then, we could
even allow users to track I/O in AsyncIO (which would currently be a mess).
However, with the current abstraction, it would be relatively easy to add
separate metrics later.

3. As outlined in the JavaDoc and in the draft PR [1], it's up to the user
to implement it in a way that fetch time always corresponds to the latest
polled record. For SourceReaderBase, I have added a new
RecordsWithSplitIds#lastFetchTime (with default return value null) that
sets the last fetch time automatically whenever the next batch is selected.
Tbh this metric is a bit more challenging to implement for
non-SourceReaderBase sources but I have not found a better, thread-safe
way. Of course, we could shift the complete calculation into user-land but
I'm not sure that this is easier.
For your scenarios:
- in A, you assume SourceReaderBase. In that case, we could eagerly report
the metric as sketched by you. It depends on the definition of "last
processed record" in FLIP-33, whether this eager reporting is more correct
than the lazy reporting that I have proposed. The former case assumes "last
processed record" = last fetched record, while the latter case assumes
"last processed record" = "last polled record". For the proposed solution,
the user would just need to implement RecordsWithSplitIds#lastFetchTime,
which typically corresponds to the creation time of the RecordsWithSplitIds
instance.
- B is not assuming SourceReaderBase.
If it's SourceReaderBase, the same proposed solution works out of the box:
SourceOperator intercepts the emitted event time and uses the fetch time of
the current batch.
If it's not SourceReaderBase, the user would need to attach the timestamp
to the handover protocol if multi-threaded and set the lastFetchTimeGauge
when a value in the handover protocol is selected (typically a batch).
If it's a single threaded source, the user could directly set the current
timestamp after fetching the records in a sync fashion.
The bad case is if the user is fetching individual records (either sync or
async), then the fetch time would be updated with every record. However,
I'm assuming that the required system call is dwarfed by involved I/O.

[1] https://github.com/apache/flink/pull/15972

On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler 
wrote:

> Re 1: We don't expose the reuse* methods, because the proposed
> OperatorIOMetricGroup is a separate interface from the existing
> implementations (which will be renamed and implement the new interface).
>
> Re 2: Currently the plan is to re-use the "new" numByesIn/Out counters
> for tasks ("new" because all we are doing is exposing already existing
> metrics). We may however change this in the future if we want to report
> the byte metrics on an operator level, which is primarily interesting
> for async IO or other external connectivity outside of sinks/sources.
>
> On 13/07/2021 12:38, Becket Qin wrote:
> > Hi Arvid,
> >
> > Thanks for the proposal. I like the idea of exposing concrete metric
> group
> > class so that users can access the predefined metrics.
> >
> > A few questions are following:
> >
> > 1. When exposing the OperatorIOMetrics to the users, we are also exposing
> > the reuseInputMetricsForTask to the users. Should we hide these two
> methods
> > because users won't have enough information to decide whether the records
> > IO metrics should be reused by the task or not.
> >
> > 2. Similar to question 1, in the OperatorIOMetricGroup, we are adding
> > numBytesInCounter and numBytesOutCounter. Should these metrics be reusing
> > the task level metrics by default?
> >
> > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not sure how
> > it works with the FetchLag. Typically there are two cases when reporting
> > the fetch lag.
> >  A. The EventTime is known at the point when the record is fetched in
> > the SplitFetcher, so the fetch lag can be derived and reported
> immediately.
> >  B. The EventTime is known only after the fetched record was parsed
> in
> > the RecordEmitter. In this case, the RecordEmitter needs to get the fetch
> > time of that particular record.
> > I am not sure when users set the LastFetchTime in the above two cases.
> Can
> > you help elaborate on how users should use it?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise  wrote:
> >
> >> Dea

[DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-08 Thread Arvid Heise
Dear devs,

As a continuation and generalization of FLIP-33 (Standardize Connector
Metrics) [1], we'd like to discuss how we actually expose the standardized
operator metrics to users in terms of changes to the API.

Please check out the FLIP [2] and provide feedback.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics


Re: [ANNOUNCE] New Apache Flink Committer - Yuan Mei

2021-07-08 Thread Arvid Heise
Yay!

On Thu, Jul 8, 2021 at 10:02 AM Jiayi Liao  wrote:

> Congratulations Yuan!
>
> Best,
> Jiayi Liao
>
> On Thu, Jul 8, 2021 at 3:55 PM Roman Khachatryan  wrote:
>
> > Congratulations Yuan!
> >
> > Regards,
> > Roman
> >
> > On Thu, Jul 8, 2021 at 6:02 AM Yang Wang  wrote:
> > >
> > > Congratulations Yuan!
> > >
> > > Best,
> > > Yang
> > >
> > > XING JIN  于2021年7月8日周四 上午11:46写道:
> > >
> > > > Congratulations Yuan~!
> > > >
> > > > Roc Marshal  于2021年7月8日周四 上午11:28写道:
> > > >
> > > > > Congratulations, Yuan!
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > At 2021-07-08 01:21:40, "Yu Li"  wrote:
> > > > > >Hi all,
> > > > > >
> > > > > >On behalf of the PMC, I’m very happy to announce Yuan Mei as a new
> > Flink
> > > > > >committer.
> > > > > >
> > > > > >Yuan has been an active contributor for more than two years, with
> > code
> > > > > >contributions on multiple components including kafka connectors,
> > > > > >checkpointing, state backends, etc. Besides, she has been actively
> > > > > involved
> > > > > >in community activities such as helping manage releases,
> discussing
> > > > > >questions on dev@list, supporting users and giving talks at
> > > > conferences.
> > > > > >
> > > > > >Please join me in congratulating Yuan for becoming a Flink
> > committer!
> > > > > >
> > > > > >Cheers,
> > > > > >Yu
> > > > >
> > > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Yang Wang

2021-07-07 Thread Arvid Heise
Congratulations!

On Wed, Jul 7, 2021 at 12:17 PM godfrey he  wrote:

> Congratulations, Yang!
>
> Best,
> Godfrey
>
> Lijie Wang  于2021年7月7日周三 下午5:59写道:
>
> > Congratulations Yang!
> >
> > Till Rohrmann  于2021年7月7日周三 下午5:29写道:
> >
> > > Congratulations, Yang!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 7, 2021 at 9:41 AM Roman Khachatryan 
> > wrote:
> > >
> > > > Congrats!
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Wed, Jul 7, 2021 at 8:28 AM Qingsheng Ren 
> > wrote:
> > > > >
> > > > > Congratulations Yang!
> > > > >
> > > > > --
> > > > > Best Regards,
> > > > >
> > > > > Qingsheng Ren
> > > > > Email: renqs...@gmail.com
> > > > > On Jul 7, 2021, 2:26 PM +0800, Rui Li ,
> > wrote:
> > > > > > Congratulations Yang ~
> > > > > >
> > > > > > On Wed, Jul 7, 2021 at 1:01 PM Benchao Li 
> > > > wrote:
> > > > > >
> > > > > > > Congratulations!
> > > > > > >
> > > > > > > Peter Huang  于2021年7月7日周三
> 下午12:54写道:
> > > > > > >
> > > > > > > > Congratulations, Yang.
> > > > > > > >
> > > > > > > > Best Regards
> > > > > > > > Peter Huang
> > > > > > > >
> > > > > > > > On Tue, Jul 6, 2021 at 9:48 PM Dian Fu <
> dian0511...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > > Congratulations, Yang,
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Dian
> > > > > > > > >
> > > > > > > > > > 2021年7月7日 上午10:46,Jary Zhen  写道:
> > > > > > > > > >
> > > > > > > > > > Congratulations, Yang Wang.
> > > > > > > > > >
> > > > > > > > > > Best
> > > > > > > > > > Jary
> > > > > > > > > >
> > > > > > > > > > Yun Gao  于2021年7月7日周三
> > > 上午10:38写道:
> > > > > > > > > >
> > > > > > > > > > > Congratulations Yang!
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yun
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > --
> > > > > > > > > > > Sender:Jark Wu
> > > > > > > > > > > Date:2021/07/07 10:20:27
> > > > > > > > > > > Recipient:dev
> > > > > > > > > > > Cc:Yang Wang; <
> > > > wangyang0...@apache.org>
> > > > > > > > > > > Theme:Re: [ANNOUNCE] New Apache Flink Committer - Yang
> > Wang
> > > > > > > > > > >
> > > > > > > > > > > Congratulations Yang Wang!
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Jark
> > > > > > > > > > >
> > > > > > > > > > > On Wed, 7 Jul 2021 at 10:09, Xintong Song <
> > > > tonysong...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > >
> > > > > > > > > > > > On behalf of the PMC, I'm very happy to announce Yang
> > > Wang
> > > > as a new
> > > > > > > > > Flink
> > > > > > > > > > > > committer.
> > > > > > > > > > > >
> > > > > > > > > > > > Yang has been a very active contributor for more than
> > two
> > > > years,
> > > > > > > > mainly
> > > > > > > > > > > > focusing on Flink's deployment components. He's a
> main
> > > > contributor
> > > > > > > > and
> > > > > > > > > > > > maintainer of Flink's native Kubernetes deployment
> and
> > > > native
> > > > > > > > > Kubernetes
> > > > > > > > > > > > HA. He's also very active on the mailing lists,
> > > > participating in
> > > > > > > > > > > > discussions and helping with user questions.
> > > > > > > > > > > >
> > > > > > > > > > > > Please join me in congratulating Yang Wang for
> > becoming a
> > > > Flink
> > > > > > > > > > > committer!
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you~
> > > > > > > > > > > >
> > > > > > > > > > > > Xintong Song
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best regards!
> > > > > > Rui Li
> > > >
> > >
> >
>


Re: [ANNOUNCE] New PMC member: Guowei Ma

2021-07-07 Thread Arvid Heise
Congratulations!

On Wed, Jul 7, 2021 at 11:30 AM Till Rohrmann  wrote:

> Congratulations, Guowei!
>
> Cheers,
> Till
>
> On Wed, Jul 7, 2021 at 9:41 AM Roman Khachatryan  wrote:
>
> > Congratulations!
> >
> > Regards,
> > Roman
> >
> > On Wed, Jul 7, 2021 at 8:24 AM Rui Li  wrote:
> > >
> > > Congratulations Guowei!
> > >
> > > On Wed, Jul 7, 2021 at 1:01 PM Benchao Li 
> wrote:
> > >
> > > > Congratulations!
> > > >
> > > > Dian Fu  于2021年7月7日周三 下午12:46写道:
> > > >
> > > > > Congratulations, Guowei!
> > > > >
> > > > > Regards,
> > > > > Dian
> > > > >
> > > > > > 2021年7月7日 上午10:37,Yun Gao  写道:
> > > > > >
> > > > > > Congratulations Guowei!
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yun
> > > > > >
> > > > > >
> > > > > >
> --
> > > > > > Sender:JING ZHANG
> > > > > > Date:2021/07/07 10:33:51
> > > > > > Recipient:dev
> > > > > > Theme:Re: [ANNOUNCE] New PMC member: Guowei Ma
> > > > > >
> > > > > > Congratulations,  Guowei Ma!
> > > > > >
> > > > > > Best regards,
> > > > > > JING ZHANG
> > > > > >
> > > > > > Zakelly Lan  于2021年7月7日周三 上午10:30写道:
> > > > > >
> > > > > >> Congratulations, Guowei!
> > > > > >>
> > > > > >> Best,
> > > > > >> Zakelly
> > > > > >>
> > > > > >> On Wed, Jul 7, 2021 at 10:24 AM tison 
> > wrote:
> > > > > >>
> > > > > >>> Congrats! NB.
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> tison.
> > > > > >>>
> > > > > >>>
> > > > > >>> Jark Wu  于2021年7月7日周三 上午10:20写道:
> > > > > >>>
> > > > >  Congratulations Guowei!
> > > > > 
> > > > >  Best,
> > > > >  Jark
> > > > > 
> > > > >  On Wed, 7 Jul 2021 at 09:54, XING JIN <
> jinxing.co...@gmail.com>
> > > > > wrote:
> > > > > 
> > > > > > Congratulations, Guowei~ !
> > > > > >
> > > > > > Best,
> > > > > > Jin
> > > > > >
> > > > > > Xintong Song  于2021年7月7日周三 上午9:37写道:
> > > > > >
> > > > > >> Congratulations, Guowei~!
> > > > > >>
> > > > > >> Thank you~
> > > > > >>
> > > > > >> Xintong Song
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Jul 7, 2021 at 9:31 AM Qingsheng Ren <
> > renqs...@gmail.com>
> > > > >  wrote:
> > > > > >>
> > > > > >>> Congratulations Guowei!
> > > > > >>>
> > > > > >>> --
> > > > > >>> Best Regards,
> > > > > >>>
> > > > > >>> Qingsheng Ren
> > > > > >>> Email: renqs...@gmail.com
> > > > > >>> 2021年7月7日 +0800 09:30 Leonard Xu ,写道:
> > > > >  Congratulations! Guowei Ma
> > > > > 
> > > > >  Best,
> > > > >  Leonard
> > > > > 
> > > > > > ÔÚ 2021Äê7ÔÂ6ÈÕ£¬21:56£¬Kurt Young 
> > дµÀ£º
> > > > > >
> > > > > > Hi all!
> > > > > >
> > > > > > I'm very happy to announce that Guowei Ma has joined the
> > > > > >> Flink
> > > > >  PMC!
> > > > > >
> > > > > > Congratulations and welcome Guowei!
> > > > > >
> > > > > > Best,
> > > > > > Kurt
> > > > > 
> > > > > >>>
> > > > > >>
> > > > > >
> > > > > 
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> >
>


Re: [VOTE] Migrating Test Framework to JUnit 5

2021-07-05 Thread Arvid Heise
+1 (binding)

On Wed, Jun 30, 2021 at 5:56 PM Qingsheng Ren  wrote:

> Dear devs,
>
>
> As discussed in the thread[1], I’d like to start a vote on migrating the
> test framework of Flink project to JUnit 5.
>
>
> JUnit 5[2] provides many exciting new features that can simplify the
> development of test cases and make our test structure cleaner, such as
> pluggable extension models (replacing rules such as
> TestLogger/MiniCluster), improved parameterized test, annotation support
> and nested/dynamic test.
>
>
> The migration path towards JUnit 5 would be:
>
>
> 1. Remove JUnit 4 dependency and introduce junit5-vintage-engine in the
> project
>
>
> The vintage engine will keep all existing JUnit4-style cases still
> valid. Since classes of JUnit 4 and 5 are located under different packages,
> there won’t be conflict having JUnit 4 cases in the project.
>
>
> 2. Rewrite JUnit 4 rules in JUnit 5 extension style (~10 rules)
>
>
> 3. Migrate all existing tests to JUnit 5
>
>
> This would be a giant commit similar to code reformatting and could be
> merged after cutting the 1.14 release branch. There are many migration
> examples and experiences to refer to, also Intellij IDEA provides tools for
> refactoring.
>
>
> 4. Ban JUnit 4 imports in CheckStyle
>
>
> Some modules ilke Testcontainers still require JUnit 4 in the
> classpath, and JUnit 4 could still appear as transitive dependency. Banning
> JUnit 4 imports can avoid developers mistakenly using JUnit 4 and split the
> project into 4 & 5 again.
>
>
> 5. Remove vintage runner and some cleanup
>
>
>
> This vote will last for at least 72 hours, following the consensus voting
> process.
>
>
> Thanks!
>
>
> [1]
>
> https://lists.apache.org/thread.html/r6c8047c7265b8a9f2cb3ef6d6153dd80b94d36ebb03daccf36ab4940%40%3Cdev.flink.apache.org%3E
>
> [2] https://junit.org/junit5
>
> --
> Best Regards,
>
> *Qingsheng Ren*
>
> Email: renqs...@gmail.com
>


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread Arvid Heise
Looks good: +1 (binding)

On Tue, Jun 29, 2021 at 5:06 AM 刘建刚  wrote:

> +1 (binding)
>
> Best
> liujiangang
>
> Piotr Nowojski  于2021年6月29日周二 上午2:05写道:
>
> > +1 (binding)
> >
> > Piotrek
> >
> > pon., 28 cze 2021 o 12:48 Dawid Wysakowicz 
> > napisał(a):
> >
> > > +1 (binding)
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 28/06/2021 10:45, Yun Gao wrote:
> > > > Hi all,
> > > >
> > > > For FLIP-147[1] which targets at supports checkpoints after tasks
> > > finished and modify operator
> > > > API and implementation to ensures the commit of last piece of data,
> > > since after the last vote
> > > > we have more discussions[2][3] and a few updates, including changes
> to
> > > PublicEvolving API,
> > > > I'd like to have another VOTE on the current state of the FLIP.
> > > >
> > > > The vote will last at least 72 hours (Jul 1st), following the
> consensus
> > > > voting process.
> > > >
> > > > thanks,
> > > >  Yun
> > > >
> > > >
> > > > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> > > > [2]
> > >
> >
> https://lists.apache.org/thread.html/r400da9898ff66fd613c25efea15de440a86f14758ceeae4950ea25cf%40%3Cdev.flink.apache.org
> > > > [3]
> > >
> >
> https://lists.apache.org/thread.html/r3953df796ef5ac67d5be9f2251a95ad72efbca31f1d1555d13e71197%40%3Cdev.flink.apache.org%3E
> > >
> > >
> >
>


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Arvid Heise
+1 (binding)

Thank you and Thomas for driving this

On Thu, Jul 1, 2021 at 7:50 AM 蒋晓峰  wrote:

> Hi everyone,
>
>
>
>
> Thanks for all the feedback to Hybrid Source so far. Based on the
> discussion[1] we seem to have consensus, so I would like to start a vote on
> FLIP-150 for which the FLIP has also been updated[2].
>
>
>
>
> The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT) unless
> there is an objection or insufficient votes.
>
>
>
>
> Thanks,
>
> Nicholas Jiang
>
>
>
>
> [1]
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
>
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


Re: [DISCUSS] Moving to JUnit5

2021-06-29 Thread Arvid Heise
Hi Qingsheng,

I like the idea of enforcing JUnit5 tests with checkstyle. I'm assuming
JUnit4 will bleed from time to time into the test classpath.

Obviously, we can only do that after all tests are migrated and we are
confident that no small change would require a contributor to do the
migration of a test in an unrelated change.

For the big commit, I'd propose to have it after branch cut for 1.14
release. So for 1.14 we would just have the coexistance PR with the vintage
engine. In that way, the least possible number of contributors should be
affected. Of course, the big commit can be prepared beforehand.

On Tue, Jun 29, 2021 at 11:44 AM Qingsheng Ren  wrote:

> Thanks for wrapping things up and effort on the migration Arvid!
>
> I’m +1 for the migration plan.
>
> To summarize the migration path proposed by Arvid:
>
> 1. Remove JUnit 4 dependency and introduce junit5-vintage-engine in the
> project (all existing cases will still work)
> 2. Rewrite JUnit 4 rules in JUnit 5 extension style (~10 rules)
> 3. Migrate all existing tests to JUnit 5 (This is a giant commit similar
> to code formatting)
> 4. Remove vintage runner and some cleanup
>
> One issue is that we cannot totally get rid of JUnit 4 dependency from the
> project because:
>
> 1. Testcontainers. As mentioned in their official docs[1], Testcontainers
> still depends on JUnit 4, and the problem might not be solved until
> Testcontainer 2, which still has no roadmap[2].
> 2. It’s still possible to appear as transitive dependency
>
> Since classes of JUnit 4 and 5 are under different packages, there won’t
> be conflict having JUnit 4 in the project. To prevent the project splitting
> into 4 & 5 again, we can ban JUnit 4 imports in CheckStyle to prevent
> developers to write test cases in JUnit 4 style intentionally or mistakenly.
>
> I’m happy and willing to take over the migration work. This migration
> indeed takes some efforts, but it will help with test case developing in
> the future.
>
> [1] https://www.testcontainers.org/test_framework_integration/junit_5/
> [2]
> https://github.com/testcontainers/testcontainers-java/issues/970#issuecomment-437273363
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Jun 16, 2021, 3:13 AM +0800, Arvid Heise , wrote:
> > Sorry for following up so late. A while ago, I spiked a junit 5
> migration.
> >
> > To recap: here is the migration plan.
> >
> > 0. (There is a way to use JUnit4 + 5 at the same time in a project -
> you'd
> > > use a specific JUnit4 runner to execute JUnit5. I'd like to skip this
> > > step as it would slow down migration significantly)
> > > 1. Use JUnit5 with vintage runner. JUnit4 tests run mostly out of the
> > > box. The most important difference is that only 3 base rules are
> supported
> > > and the remainder needs to be migrated. Luckily, most of our rules
> derive
> > > from the supported ExternalResource. So in this step, we would need to
> > > migrate the rules.
> > > 2. Implement new tests in JUnit5.
> > > 3. Soft-migrate old tests in JUnit5. This is mostly a renaming of
> > > annotation (@Before -> @BeforeEach, etc.). Adjust parameterized tests
> > > (~400), replace rule usages (~670) with extensions, exception handling
> > > (~1600 tests), and timeouts (~200). This can be done on a test class by
> > > test class base and there is no hurry.
> > > 4. Remove vintage runner, once most tests are migrated by doing a final
> > > push for lesser used modules.
> > >
> >
> > Here are my insights:
> > 0. works but I don't see the benefit
> > 1. works well [1] with a small diff [2]. Note that the branch is based
> on a
> > quite old master.
> > 2. works well as well [3].
> > 2a. However, we should be aware that we need to port quite a few rules to
> > extensions before we can implement more complex JUnit5 tests, especially
> > ITCases (I'd probably skip junit-jupiter-migrationsupport that allows us
> to
> > reuse _some_ rules using specific base classes). We have ~10-15 rules
> that
> > need to be ported.
> > 3. Soft migration will take forever and probably never finish. Many tests
> > can be automatically ported with some (I used 8) simple regexes. I'd
> rather
> > do a hard migration of all tests at a particular point (no freeze) and
> have
> > that git commit excluded from blame, similar to the spotless commit.
> > 3a. A huge chunk of changes (>90%) comes from the optional message in
> > assertX being moved from the first to the last position. @Chesnay
> Schepler
> >  proposed to rather implement our own Assertion
&g

Re: [VOTE] FLIP-172: Support custom transactional.id prefix in FlinkKafkaProducer

2021-06-28 Thread Arvid Heise
+1 (binding)

On Mon, Jun 28, 2021 at 8:04 PM Piotr Nowojski  wrote:

> +1 (binding)
>
> Piotrek
>
> pon., 28 cze 2021 o 16:01 Wenhao Ji  napisał(a):
>
> > Hi everyone,
> >
> > I would like to start a vote on FLIP-172 [1] which was discussed in
> > this thread [2].
> > The vote will be open for at least 72 hours until July 1 unless there
> > is an objection or not enough votes.
> >
> > Thanks,
> > Wenhao
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer
> > [2]
> >
> https://lists.apache.org/thread.html/r67610aa2d4dfdaf3b027b82edd1a3f46771f0d58902a4258d931e5a5%40%3Cdev.flink.apache.org%3E
> >
>


Re: [VOTE] FLIP-171: Async Sink

2021-06-24 Thread Arvid Heise
Thanks for preparing the FLIP. +1 (binding) from my side.

On Tue, Jun 22, 2021 at 2:28 PM Hausmann, Steffen 
wrote:

> Hi there,
>
> After the discussion in [1], I’d like to open a voting thread for FLIP-171
> [2], which proposes a base implementation for sinks that support async
> requests.
>
> The vote will be open until June 25 (72h), unless there is an objection or
> not enough votes.
>
> Cheers, Steffen
>
> [1]
> https://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3CC83F4222-4D07-412D-9BD5-DB92D59DDF03%40amazon.de%3E
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>


Re: [DISCUSS] Drop Mesos in 1.14

2021-06-22 Thread Arvid Heise
+1 for dropping. Frankly speaking, I don't see it having any future (and D2iQ
agrees).

If there is a surprisingly huge demand, I'd try to evaluate plugins for it.

On Tue, Jun 22, 2021 at 11:46 AM Till Rohrmann  wrote:

> I'd be ok with dropping support for Mesos if it helps us to clear our
> dependencies in the flink-runtime module. If we do it, then we should
> probably update our documentation with a pointer to the latest Flink
> version that supports Mesos in case of users strictly need Mesos.
>
> Cheers,
> Till
>
> On Tue, Jun 22, 2021 at 10:29 AM Chesnay Schepler 
> wrote:
>
> > Last week I spent some time looking into making flink-runtime scala
> > free, which effectively means to move the Akka-reliant classes to
> > another module, and load that module along with Akka and all of it's
> > dependencies (including Scala) through a separate classloader.
> >
> > This would finally decouple the Scala versions required by the runtime
> > and API, and would allow us to upgrade Akka as we'd no longer be limited
> > to Scala 2.11. It would rid the classpath of a few dependencies, and
> > remove the need for scala suffixes on quite a few modules.
> >
> > However, our Mesos support has unfortunately a hard dependency on Akka,
> > which naturally does not play well with the goal of isolating Akka in
> > it's own ClassLoader.
> >
> > To solve this issue I was thinking of simple dropping flink-mesos in
> > 1.14 (it was deprecated in 1.13).
> >
> > Truth be told, I picked this option because it is the easiest to do. We
> > _could_ probably make things work somehow (likely by shipping a second
> > Akka version just for flink-mesos), but it doesn't seem worth the hassle
> > and would void some of the benefits. So far we kept flink-mesos around,
> > despite not really developing it further, because it didn't hurt to have
> > it in still in Flink, but this has now changed.
> >
> > Please tell me what you think.
> >
> >
>


Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-06-22 Thread Arvid Heise
I think this is overall a good idea. So +1 from my side.
However, I'd like to put a higher priority on infrastructure then, in
particular docker image/artifact caches.

On Tue, Jun 22, 2021 at 11:50 AM Till Rohrmann  wrote:

> Thanks for bringing this topic to our attention Xintong. I think your
> proposal makes a lot of sense and we should follow it. It will give us
> confidence that our changes are working and it might be a good incentive to
> quickly fix build instabilities. Hence, +1.
>
> Cheers,
> Till
>
> On Tue, Jun 22, 2021 at 11:12 AM Xintong Song 
> wrote:
>
> > Hi everyone,
> >
> > In the past a couple of weeks, I've observed several times that PRs are
> > merged without a green light from the CI tests, where failure cases are
> > considered *unrelated*. This may not always cause problems, but would
> > increase the chance of breaking our code base. In fact, it has occurred
> to
> > me twice in the past few weeks that I had to revert a commit which breaks
> > the master branch due to this.
> >
> > I think it would be nicer to enforce a stricter rule, that no PRs should
> be
> > merged without passing CI.
> >
> > The problems of merging PRs with "unrelated" test failures are:
> > - It's not always straightforward to tell whether a test failures are
> > related or not.
> > - It prevents subsequent test cases from being executed, which may fail
> > relating to the PR changes.
> >
> > To make things easier for the committers, the following exceptions might
> be
> > considered acceptable.
> > - The PR has passed CI in the contributor's personal workspace. Please
> post
> > the link in such cases.
> > - The CI tests have been triggered multiple times, on the same commit,
> and
> > each stage has at least passed for once. Please also comment in such
> cases.
> >
> > If we all agree on this, I'd update the community guidelines for merging
> > PRs wrt. this proposal. [1]
> >
> > Please let me know what do you think.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> >
>


Re: PR: "Propagate watermarks to sink API"

2021-06-21 Thread Arvid Heise
Hi Eron,

I will check it tomorrow. Sorry for the delay. If someone beats me to that,
please go ahead.

On Mon, Jun 21, 2021 at 8:18 PM Eron Wright 
wrote:

> Would someone be willing and able to review the PR which adds watermarks to
> the sink API?
>
> https://github.com/apache/flink/pull/15950
>
> Thanks!
> Eron
>


Re: [DISCUSS] FLIP-171: Async Sink

2021-06-21 Thread Arvid Heise
Hi Piotr,

to pick up this discussion thread again:
- This FLIP is about providing some base implementation for FLIP-143 sinks
that make adding new implementations easier, similar to the
SourceReaderBase.
- The whole availability topic will most likely be a separate FLIP. The
basic issue just popped up here because we currently have no way to signal
backpressure in sinks except by blocking `write`. This feels quite natural
in sinks with sync communication but quite unnatural in async sinks.

Now we have a couple of options. In all cases, we would have some WIP limit
on the number of records/requests being able to be processed in parallel
asynchronously (similar to asyncIO).
1. We use some blocking queue in `write`, then we need to handle
interruptions. In the easiest case, we extend `write` to throw the
`InterruptedException`, which is a small API change.
2. We use a blocking queue, but handle interrupts and swallow/translate
them. No API change.
Both solutions block the task thread, so any RPC message / unaligned
checkpoint would be processed only after the backpressure is temporarily
lifted. That's similar to the discussions that you linked. Cancellation may
also be a tad harder on 2.
3. We could also add some `wakeUp` to the `SinkWriter` similar to
`SplitFetcher` [1]. Basically, you use a normal queue with a completeable
future on which you block. Wakeup would be a clean way to complete it next
to the natural completion through finished requests.
4. We add availability to the sink. However, this API change also requires
that we allow operators to be available so it may be a bigger change with
undesired side-effects. On the other hand, we could also use the same
mechanism for asyncIO.

For users of FLIP-171, none of the options are exposed. So we could also
start with a simple solution (add `InterruptedException`) and later try to
add availability. Option 1+2 would also not require an additional FLIP; we
could add it as part of this FLIP.

Best,

Arvid

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
 wrote:

> Hey Piotrek,
>
> Thanks for your comments on the FLIP. I'll address your second question
> first, as I think it's more central to this FLIP. Just looking at the AWS
> ecosystem, there are several sinks with overlapping functionality. I've
> chosen AWS sinks here because I'm most familiar with those, but a similar
> argument applies more generically for destination that support async ingest.
>
> There is, for instance, a sink for Amazon Kinesis Data Streams that is
> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a
> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All
> these sinks have implemented their own mechanisms for batching, persisting,
> and retrying events. And I'm not sure if all of them properly participate
> in checkpointing. [3] even seems to closely mirror [1] as it contains
> references to the Kinesis Producer Library, which is unrelated to Amazon
> DynamoDB.
>
> These sinks predate FLIP-143. But as batching, persisting, and retrying
> capabilities do not seem to be part of FLIP-143, I'd argue that we would
> end up with similar duplication, even if these sinks were rewritten today
> based on FLIP-143. And that's the idea of FLIP-171: abstract away these
> commonly required capabilities so that it becomes easy to create support
> for a wide range of destination without having to think about batching,
> retries, checkpointing, etc. I've included an example in the FLIP [5] that
> shows that it only takes a couple of lines of code to implement a sink with
> exactly-once semantics. To be fair, the example is lacking robust failure
> handling and some more advanced capabilities of [1], but I think it still
> supports this point.
>
> Regarding your point on the isAvailable pattern. We need some way for the
> sink to propagate backpressure and we would also like to support time based
> buffering hints. There are two options I currently see and would need
> additional input on which one is the better or more desirable one. The
> first option is to use the non-blocking isAvailable pattern. Internally,
> the sink persists buffered events in the snapshot state which avoids having
> to flush buffered record on a checkpoint. This seems to align well with the
> non-blocking isAvailable pattern. The second option is to make calls to
> `write` blocking and leverage an internal thread to trigger flushes based
> on time based buffering hints. We've discussed these options with Arvid and
> suggested to assumed that the `isAvailable` pattern will become available
> for sinks through and additional FLIP.
>
> I think it is an important discussion to have. My understanding of the
> implications for Flink in general are very naïve, so I'd be happy to get
> further guidance. 

Re: [DISCUSS] FLIP-171: Async Sink

2021-06-18 Thread Arvid Heise
Hi Danny,

to add I'd propose to use the flink-connector-base package which has the
rough equivalent on source-side SourceReaderBase [1]. Since it's such a
handy base implementation, I'd like to see it directly in the main flink
repository.

For the actual connectors, I'm currently working on a proposal for a common
connector repository under Flink umbrella.

[1]
https://github.com/AHeise/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L58-L58

On Wed, Jun 16, 2021 at 6:06 PM Hausmann, Steffen 
wrote:

> Hi Danny,
>
> Right now, I'd expect the core of the Async Sink (without third party
> dependencies) to live in its own submodule. For instance
> `flink-connector-async` as part of `flink-connectors`.
>
> I'm currently planning to implement three different sinks to verify that
> the design of the sink if flexible enough to support different services:
> Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon
> DynamoDB. But I'm not sure where to actually put them. To keep is simple,
> I'd start with a module that contains all AWS specific connectors. However,
> it has the obvious disadvantage that if someone wants to use a single sink,
> they would need to pull in all dependencies for all supported services that
> are included in this module (mainly the AWS SDK for these services). But I
> don't know how much of a problem that's going to be in practice. If the
> respective jar grows too big because all the included dependencies, that's
> certainly not going to work. But for now I'd just give it a try and then
> start a discussion once I have more data to share.
>
> What's more interesting is whether that module should be part of the Flink
> code base or live somewhere else. I'd be great to get some feedback from
> the community on this.
>
> Regarding the Kinesis Data Streams sink, I fully agree that it would be
> nice to remove the dependency to the KPL. So it seems to be desirable to
> keep the existing and the new FLIP-171  based implementation in separate
> modules. Otherwise people would be forced to pull in the KPL dependencies,
> even if they are only using the new implementation. In addition, the new
> implementation will not support the exact same functionality as the
> existing one: the KPL implements a very optimized form of aggregation on a
> shard level [1] by maintaining a mapping of shards and their respective key
> spaces. The new implementation can in principle support aggregation as
> well, but only on a partition key level, which may lead to less efficient
> aggregation and higher latencies.
>
> Cheers, Steffen
>
> [1]
> https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
>
>
>
> On 15.06.21, 19:52, "Cranmer, Danny" 
> wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Hey Steffen,
>
> I have a few questions regarding the FLIP:
> 1. Where do you expect the core code to live, would it be in an
> existing module (say flink-clients) or would you introduce a new module?
> 2. Which destination implementations do you intend to ship with this
> FLIP? I see an example with Kinesis but you also list a bunch of other
> candidates.
> 3. For the Kinesis implementation, would you add the Sink to the
> existing flink-connector-kinesis repo, or create a new module? Reason I ask
> is that the existing Kinesis Sink depends on KPL and has a heavy transitive
> dependency chain, removing this would substantially reduce application size
> and clean the dependency chain
>
> Thanks,
>
> On 10/06/2021, 09:09, "Hausmann, Steffen" 
> wrote:
>
> CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>
>
>
> Hey Piotrek,
>
> Thanks for your comments on the FLIP. I'll address your second
> question first, as I think it's more central to this FLIP. Just looking at
> the AWS ecosystem, there are several sinks with overlapping functionality.
> I've chosen AWS sinks here because I'm most familiar with those, but a
> similar argument applies more generically for destination that support
> async ingest.
>
> There is, for instance, a sink for Amazon Kinesis Data Streams
> that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose
> [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4].
> All these sinks have implemented their own mechanisms for batching,
> persisting, and retrying events. And I'm not sure if all of them properly
> participate in checkpointing. [3] even seems to closely mirror [1] as it
> contains references to the Kinesis Producer Library, which is unrelated to
> Amazon DynamoDB.
>
> These sinks 

  1   2   3   4   >