Thank Dawid for the reminder on FLIP-182. Sorry I did miss it. I don't have other concerns then.
Best, Jark On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > @Jark: > > 1. Will the framework always align with watermarks when the source > implements the interface? > I'm afraid not every case needs watermark alignment even if Kafka > implements the interface, > and this will affect the throughput somehow. I agree with Becket > we may need a > `supportSplitsAlignment()` method for users to configure the source to > enable/disable the alignment. > > 2. How does the framework calculate maxDesiredWatermark? > I think the algorithm of maxDesiredWatermark will greatly affect throughput > if the reader is constantly > switching between pause and resume. Can users configure the alignment > offset? > > > This is covered in the previous FLIP[1] which has been already implemented > in 1.15. In short, it must be enabled with the watermark strategy which > also configures drift and update interval. > > If we don't plan to extend this interface to support align other things, I > suggest explicitly declaring > the purpose of the methods, such as `alignWatermarksForSplits` instead of > `alignSplits`. > > > Sure let's rename it. > > @Becket: > > I understand your point. On the other hand putting all methods, even with > "supportsXXX" methods for enabling certain features, makes the entry > threshold for writing a new source higher. Instead of focusing on the basic > and required properties of the Source, the person implementing a source > must bother with and need to figure out what all of the extra features are > about and how to deal with them. It makes it also harder to organize > methods in coupled groups as Jark said. > > Having said that, as I don't have a preference and I agree most of the > sources will support the alignment I am fine following your suggestion to > have the SourceReader extending from WithWatermarksSplitsAlignment, but > would put the "supportsXXX" there, not in the Source to keep the two > methods together. > > Lastly, I agree it is really unfortunate the "alignSplits" methods differ > slightly for SourceReader and SpitReader. The reason for that is > SourceReaderBase deals only with SplitIds, whereas SplitReader needs the > actual splits to pause them. I found the discrepancy acceptable for the > sake of simplifying changes significantly, especially as they would highly > likely impact performance as we would have to perform additional lookups. > Moreover the SplitReader is a secondary interface. > > Best, > > Dawid > > [1] https://cwiki.apache.org/confluence/x/hQYBCw > > On 24/04/2022 17:15, Jark Wu wrote: > > Thanks for the effort, Dawid and Sebastian! > > I just have some minor questions (maybe I missed something). > > 1. Will the framework always align with watermarks when the source > implements the interface? > I'm afraid not every case needs watermark alignment even if Kafka > implements the interface, > and this will affect the throughput somehow. I agree with Becket > we may need a > `supportSplitsAlignment()` method for users to configure the source to > enable/disable the alignment. > > 2. How does the framework calculate maxDesiredWatermark? > I think the algorithm of maxDesiredWatermark will greatly affect throughput > if the reader is constantly > switching between pause and resume. Can users configure the alignment > offset? > > 3. Interface/Method Name. > Can the interface be used to align other things in the future? For example, > align read speed, I have > seen users requesting global rate limits. This feature may also need an > interface like this. > If we don't plan to extend this interface to support align other things, I > suggest explicitly declaring > the purpose of the methods, such as `alignWatermarksForSplits` instead of > `alignSplits`. > > 4. Interface or Method. > I don't have a strong opinion on this. I think they have their own > advantages. > In Flink SQL, we heavily use Interfaces for extending abilities > (SupportsXxxx) for TableSource/TableSink, > and I prefer Interfaces rather than methods in this case. When you have a > bunch of abilities and each ability > has more than one method, Interfaces can help to organize them and make > users clear which methods > need to implement when you want to have an ability. > > > Best, > Jark > > On Sun, 24 Apr 2022 at 18:13, Becket Qin <becket....@gmail.com> > <becket....@gmail.com> wrote: > > > Hi Dawid, > > Thanks for the explanation. Apologies that I somehow misread a bunch of > "align" and thought they were "assign". > > Regarding 1, by default implementation, I was thinking of the default no-op > implementation. I am a little worried about the proliferation of decorative > interfaces. I think the most important thing about interfaces is that they > are easy to understand. In this case, I prefer adding new method to the > existing interface for the following reasons: > > a) I feel the biggest drawback of decorative interfaces is which interface > they can decorate and which combinations of multiple decorative interfaces > are valid. In the current FLIP, the withSplitsAlignment interface is only > applicable to the SourceReader which means it can't decorate any other > interface. From an interface design perspective, a natural question is why > not let "AlignedSplitReader" extend "withSplitsAlignment"? And it is also > natural to assume that a split reader implementing both SplitReader and > WithSplitAlignment would work, because a source reader implementing > SourceReader and withSplitsAlignment works. So why isn't there an interface > of AlignedSourceReader? In the future, if there is a new feature added > (e.g. sorted or pre-partitioned data aware), are we going to create another > interface of SplitReader such as SortedSplitReader or PrePartitionedAware? > Can they be combined? So I think the additional decorative interface like > withSplitsAlignment actually increases the understanding cost of users > because they have to know what decorative interfaces are there, which > interface they can decorate and which combinations of the decorative > interfaces are valid and which are not. Ideally we want to avoid that. To > be clear, I am not opposing having an interface of withSplitsAlignment, it > is completely OK to have it as an internal interface and let SourceReader > and SplitReader both extend it. > > b) Adding a new method to the SourceReader with a default implementation of > no-op would help avoid logic branching in the source logic, especially > given that we agree that the vast majority of the SourceReader > implementations, if not all, would just extend from the SourceReaderBase. > That means adding a new method to the interface would effectively give the > same user experience, but simpler. > > c) A related design principle that may be worth discussing is how do we let > the Source implementations tell Flink what capability is supported and what > is not. Personally speaking I feel the most intuitive place to me is in the > Source itself, because that is the entrance of the entire Source connector > logic. > > Based on the above thoughts, I am wondering if the following interface > would be easier to understand by the users. > > - Change "withSplitsAlignment" to internal interface, let both SourceReader > and SplitReader extend from it, with a default no-op implementation. > - Add a new method "boolean supportSplitsAlignment()" to the Source > interface, with a default implementation returning false. Sources that have > implemented the alignment logic can change this to return true, and > override the alignSplits() methods in the SourceReader / SplitReader if > needed. > - In the future, if a new optional feature is going to be added to the > Source, and that feature requires the awareness from Flink, we can add more > such methods to the Source. > > What do you think? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > wrote: > > > @Konstantin: > > As part of this FLIP, the `AlignedSplitReader` interface (aka the stop & > resume behavior) will be implemented for Kafka and Pulsar only, correct? > > Correct, as far as I know though, those are the only sources which > > consume > > concurrently from multiple splits and thus alignment applies. > > @Thomas: > > I wonder if "supporting" split alignment in SourceReaderBase and then > > doing > > nothing if the split reader does not implement AlignedSplitReader could > > be > > misleading? Perhaps WithSplitsAlignment can instead be added to the > specific source reader (i.e. KafkaSourceReader) to make it explicit that > the source actually supports it. > > I understand your concern. Hmm, I think we could actually do that. Given > the actual implementation of the SourceReaderBase#alignSplits is rather > short (just a forward to the corresponding method of SplitFetcher), we > could reimplement it in the actual source implementations. This solution > has the downside though. Authors of new sources would have to do two > things: extend from AlignedSplitReader and implement > > WithSplitsAssignment, > > instead of just extending AlignedSplitReader. I would be fine with such a > tradeoff though. What others think? > > @Steven: > > For this part from the motivation section, is it accurate? Let's assume > > one > > source task consumes from 3 partitions and one of the partition is > significantly slower. In this situation, watermark for this source task > won't hold back as it is reading recent data from other two Kafka > partitions. As a result, it won't hold back the overall watermark. I > thought the problem is that we may have late data for this slow > > partition. > > It will hold back the watermark. Watermark of an operator is the minimum > of watermarks of all splits[1] > > I have another question about the restart. Say split alignment is > triggered. checkpoint is completed. job failed and restored from the last > checkpoint. because alignment decision is not checkpointed, initially > alignment won't be enforced until we get a cycle of watermark aggregation > and propagation, right? Not saying this corner is a problem. Just want to > understand it more. > > Your understanding is correct. > > @Becket: > > 1. I think watermark alignment is sort of a general use case, so should > > we > > just add the related methods to SourceReader directly instead of > introducing the new interface of WithSplitAssignment? We can provide > default implementations, so backwards compatibility won't be an issue. > > I don't think we can provide a default implementation. How would we do > that? Would it be just a no-op? Is it better than having an opt-in > interface? The default implementation would have to be added exclusively > > in > > a *Public* SourceReader interface. By the way notice SourceReaderBase > does extend from WithSplitsAlignment, so effectively all implementations > > do > > handle the alignment case. To be honest I think it is impossible to > implement the SourceReader interface directly by end users. > > 2. As you mentioned, the SplitReader interface probably also needs some > change to support throttling at the split granularity. Can you add that > interface change into the public interface section as well? > > It has been added from the beginning. See *AlignedSplitReader.* > > 3. Nit, can we avoid using the method name assignSplits here, given that > > it > > is not actually changing the split assignments? It seems something like > pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate. > > The method's called *alignSplits*, not assign. Do you still prefer a > different name for that? Personally, I am open for suggestions here. > > Best, > > Dawid > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation > > On 22/04/2022 05:59, Becket Qin wrote: > > Thanks for driving the effort, Sebastion. I think the motivation makes a > lot of sense. Just a few suggestions / questions. > > 1. I think watermark alignment is sort of a general use case, so should > > we > > just add the related methods to SourceReader directly instead of > introducing the new interface of WithSplitAssignment? We can provide > default implementations, so backwards compatibility won't be an issue. > > 2. As you mentioned, the SplitReader interface probably also needs some > change to support throttling at the split granularity. Can you add that > interface change into the public interface section as well? > > 3. Nit, can we avoid using the method name assignSplits here, given that > > it > > is not actually changing the split assignments? It seems something like > pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <stevenz...@gmail.com> > <stevenz...@gmail.com> < > > stevenz...@gmail.com> wrote: > > However, a single source operator may read data from multiple > > splits/partitions, e.g., multiple Kafka partitions, such that even with > watermark alignment the source operator may need to buffer excessive > > amount > > of data if one split emits data faster than another. > > For this part from the motivation section, is it accurate? Let's assume > > one > > source task consumes from 3 partitions and one of the partition is > significantly slower. In this situation, watermark for this source task > won't hold back as it is reading recent data from other two Kafka > partitions. As a result, it won't hold back the overall watermark. I > thought the problem is that we may have late data for this slow > > partition. > > I have another question about the restart. Say split alignment is > triggered. checkpoint is completed. job failed and restored from the last > checkpoint. because alignment decision is not checkpointed, initially > alignment won't be enforced until we get a cycle of watermark aggregation > and propagation, right? Not saying this corner is a problem. Just want to > understand it more. > > > > On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <t...@apache.org> > <t...@apache.org> < > > t...@apache.org> wrote: > > Thanks for working on this! > > I wonder if "supporting" split alignment in SourceReaderBase and then > > doing > > nothing if the split reader does not implement AlignedSplitReader could > > be > > misleading? Perhaps WithSplitsAlignment can instead be added to the > specific source reader (i.e. KafkaSourceReader) to make it explicit that > the source actually supports it. > > Thanks, > Thomas > > > On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <kna...@apache.org> > <kna...@apache.org> < > > kna...@apache.org> > > wrote: > > > Hi Sebastian, Hi Dawid, > > As part of this FLIP, the `AlignedSplitReader` interface (aka the stop > > & > > resume behavior) will be implemented for Kafka and Pulsar only, > > correct? > > +1 in general. I believe it is valuable to complete the watermark > > aligned > > story with this FLIP. > > Cheers, > > Konstantin > > > > > > > > On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz < > dwysakow...@apache.org> > > wrote: > > > To be explicit, having worked on it, I support it ;) I think we can > start a vote thread soonish, as there are no concerns so far. > > Best, > > Dawid > > On 13/04/2022 11:27, Sebastian Mattheis wrote: > > Dear Flink developers, > > I would like to open a discussion on FLIP 217 [1] for an extension > > of > > Watermark Alignment to perform alignment also in SplitReaders. To > > do > > so, > > SplitReaders must be able to suspend and resume reading from split > > sources > > where the SourceOperator coordinates and controlls suspend and > > resume. > > To > > gather information about current watermarks of the SplitReaders, we > > extend > > the internal WatermarkOutputMulitplexer and report watermarks to > > the > > SourceOperator. > > There is a PoC for this FLIP [2], prototyped by Arvid Heise and > > revised > > and > > reworked by Dawid Wysakowicz (He did most of the work.) and me. The > > changes > > are backwards compatible in a way that if affected components do > > not > > support split alignment the behavior is as before. > > Best, > Sebastian > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > > [2] https://github.com/dawidwys/flink/tree/aligned-splits > > -- > > Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk > >