Thank Dawid for the reminder on FLIP-182. Sorry I did miss it.
I don't have other concerns then.

Best,
Jark

On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> @Jark:
>
> 1. Will the framework always align with watermarks when the source
> implements the interface?
> I'm afraid not every case needs watermark alignment even if Kafka
> implements the interface,
> and this will affect the throughput somehow. I agree with Becket
> we may need a
> `supportSplitsAlignment()` method for users to configure the source to
> enable/disable the alignment.
>
> 2. How does the framework calculate maxDesiredWatermark?
> I think the algorithm of maxDesiredWatermark will greatly affect throughput
> if the reader is constantly
>  switching between pause and resume. Can users configure the alignment
> offset?
>
>
> This is covered in the previous FLIP[1] which has been already implemented
> in 1.15. In short, it must be enabled with the watermark strategy which
> also configures drift and update interval.
>
> If we don't plan to extend this interface to support align other things, I
> suggest explicitly declaring
> the purpose of the methods, such as `alignWatermarksForSplits` instead of
> `alignSplits`.
>
>
> Sure let's rename it.
>
> @Becket:
>
> I understand your point. On the other hand putting all methods, even with
> "supportsXXX" methods for enabling certain features, makes the entry
> threshold for writing a new source higher. Instead of focusing on the basic
> and required properties of the Source, the person implementing a source
> must bother with and need to figure out what all of the extra features are
> about and how to deal with them. It makes it also harder to organize
> methods in coupled groups as Jark said.
>
> Having said that, as I don't have a preference and I agree most of the
> sources will support the alignment I am fine following your suggestion to
> have the SourceReader extending from WithWatermarksSplitsAlignment, but
> would put the "supportsXXX" there, not in the Source to keep the two
> methods together.
>
> Lastly, I agree it is really unfortunate the "alignSplits" methods differ
> slightly for SourceReader and SpitReader. The reason for that is
> SourceReaderBase deals only with SplitIds, whereas SplitReader needs the
> actual splits to pause them. I found the discrepancy acceptable for the
> sake of simplifying changes significantly, especially as they would highly
> likely impact performance as we would have to perform additional lookups.
> Moreover the SplitReader is a secondary interface.
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/hQYBCw
>
> On 24/04/2022 17:15, Jark Wu wrote:
>
> Thanks for the effort, Dawid and Sebastian!
>
> I just have some minor questions (maybe I missed something).
>
> 1. Will the framework always align with watermarks when the source
> implements the interface?
> I'm afraid not every case needs watermark alignment even if Kafka
> implements the interface,
> and this will affect the throughput somehow. I agree with Becket
> we may need a
> `supportSplitsAlignment()` method for users to configure the source to
> enable/disable the alignment.
>
> 2. How does the framework calculate maxDesiredWatermark?
> I think the algorithm of maxDesiredWatermark will greatly affect throughput
> if the reader is constantly
>  switching between pause and resume. Can users configure the alignment
> offset?
>
> 3. Interface/Method Name.
> Can the interface be used to align other things in the future? For example,
> align read speed, I have
> seen users requesting global rate limits. This feature may also need an
> interface like this.
> If we don't plan to extend this interface to support align other things, I
> suggest explicitly declaring
> the purpose of the methods, such as `alignWatermarksForSplits` instead of
> `alignSplits`.
>
> 4. Interface or Method.
> I don't have a strong opinion on this. I think they have their own
> advantages.
> In Flink SQL, we heavily use Interfaces for extending abilities
> (SupportsXxxx) for TableSource/TableSink,
> and I prefer Interfaces rather than methods in this case. When you have a
> bunch of abilities and each ability
> has more than one method, Interfaces can help to organize them and make
> users clear which methods
> need to implement when you want to have an ability.
>
>
> Best,
> Jark
>
> On Sun, 24 Apr 2022 at 18:13, Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com> wrote:
>
>
> Hi Dawid,
>
> Thanks for the explanation. Apologies that I somehow misread a bunch of
> "align" and thought they were "assign".
>
> Regarding 1, by default implementation, I was thinking of the default no-op
> implementation. I am a little worried about the proliferation of decorative
> interfaces. I think the most important thing about interfaces is that they
> are easy to understand. In this case, I prefer adding new method to the
> existing interface for the following reasons:
>
> a) I feel the biggest drawback of decorative interfaces is which interface
> they can decorate and which combinations of multiple decorative interfaces
> are valid. In the current FLIP, the withSplitsAlignment interface is only
> applicable to the SourceReader which means it can't decorate any other
> interface. From an interface design perspective, a natural question is why
> not let "AlignedSplitReader" extend "withSplitsAlignment"? And it is also
> natural to assume that a split reader implementing both SplitReader and
> WithSplitAlignment would work, because a source reader implementing
> SourceReader and withSplitsAlignment works. So why isn't there an interface
> of AlignedSourceReader? In the future, if there is a new feature added
> (e.g. sorted or pre-partitioned data aware), are we going to create another
> interface of SplitReader such as SortedSplitReader or PrePartitionedAware?
> Can they be combined? So I think the additional decorative interface like
> withSplitsAlignment actually increases the understanding cost of users
> because they have to know what decorative interfaces are there, which
> interface they can decorate and which combinations of the decorative
> interfaces are valid and which are not. Ideally we want to avoid that. To
> be clear, I am not opposing having an interface of withSplitsAlignment, it
> is completely OK to have it as an internal interface and let SourceReader
> and SplitReader both extend it.
>
> b) Adding a new method to the SourceReader with a default implementation of
> no-op would help avoid logic branching in the source logic, especially
> given that we agree that the vast majority of the SourceReader
> implementations, if not all, would just extend from the SourceReaderBase.
> That means adding a new method to the interface would effectively give the
> same user experience, but simpler.
>
> c) A related design principle that may be worth discussing is how do we let
> the Source implementations tell Flink what capability is supported and what
> is not. Personally speaking I feel the most intuitive place to me is in the
> Source itself, because that is the entrance of the entire Source connector
> logic.
>
> Based on the above thoughts, I am wondering if the following interface
> would be easier to understand by the users.
>
> - Change "withSplitsAlignment" to internal interface, let both SourceReader
> and SplitReader extend from it, with a default no-op implementation.
> - Add a new method "boolean supportSplitsAlignment()" to the Source
> interface, with a default implementation returning false. Sources that have
> implemented the alignment logic can change this to return true, and
> override the alignSplits() methods in the SourceReader / SplitReader if
> needed.
> - In the future, if a new optional feature is going to be added to the
> Source, and that feature requires the awareness from Flink, we can add more
> such methods to the Source.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <dwysakow...@apache.org> 
> <dwysakow...@apache.org>
> wrote:
>
>
> @Konstantin:
>
> As part of this FLIP, the `AlignedSplitReader` interface (aka the stop &
> resume behavior) will be implemented for Kafka and Pulsar only, correct?
>
> Correct, as far as I know though, those are the only sources which
>
> consume
>
> concurrently from multiple splits and thus alignment applies.
>
> @Thomas:
>
> I wonder if "supporting" split alignment in SourceReaderBase and then
>
> doing
>
> nothing if the split reader does not implement AlignedSplitReader could
>
> be
>
> misleading? Perhaps WithSplitsAlignment can instead be added to the
> specific source reader (i.e. KafkaSourceReader) to make it explicit that
> the source actually supports it.
>
> I understand your concern. Hmm, I think we could actually do that. Given
> the actual implementation of the SourceReaderBase#alignSplits is rather
> short (just a forward to the corresponding method of SplitFetcher), we
> could reimplement it in the actual source implementations. This solution
> has the downside though. Authors of new sources would have to do two
> things: extend from AlignedSplitReader and implement
>
> WithSplitsAssignment,
>
> instead of just extending AlignedSplitReader. I would be fine with such a
> tradeoff though. What others think?
>
> @Steven:
>
> For this part from the motivation section, is it accurate? Let's assume
>
> one
>
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow
>
> partition.
>
> It will hold back the watermark. Watermark of an operator is the minimum
> of watermarks of all splits[1]
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
> Your understanding is correct.
>
> @Becket:
>
> 1. I think watermark alignment is sort of a general use case, so should
>
> we
>
> just add the related methods to SourceReader directly instead of
> introducing the new interface of WithSplitAssignment? We can provide
> default implementations, so backwards compatibility won't be an issue.
>
> I don't think we can provide a default implementation. How would we do
> that? Would it be just a no-op? Is it better than having an opt-in
> interface? The default implementation would have to be added exclusively
>
> in
>
> a *Public* SourceReader interface. By the way notice SourceReaderBase
> does extend from WithSplitsAlignment, so effectively all implementations
>
> do
>
> handle the alignment case. To be honest I think it is impossible to
> implement the SourceReader interface directly by end users.
>
> 2. As you mentioned, the SplitReader interface probably also needs some
> change to support throttling at the split granularity. Can you add that
> interface change into the public interface section as well?
>
> It has been added from the beginning. See *AlignedSplitReader.*
>
> 3. Nit, can we avoid using the method name assignSplits here, given that
>
> it
>
> is not actually changing the split assignments? It seems something like
> pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.
>
> The method's called *alignSplits*, not assign. Do you still prefer a
> different name for that? Personally, I am open for suggestions here.
>
> Best,
>
> Dawid
>
> [1]
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation
>
> On 22/04/2022 05:59, Becket Qin wrote:
>
> Thanks for driving the effort, Sebastion. I think the motivation makes a
> lot of sense. Just a few suggestions / questions.
>
> 1. I think watermark alignment is sort of a general use case, so should
>
> we
>
> just add the related methods to SourceReader directly instead of
> introducing the new interface of WithSplitAssignment? We can provide
> default implementations, so backwards compatibility won't be an issue.
>
> 2. As you mentioned, the SplitReader interface probably also needs some
> change to support throttling at the split granularity. Can you add that
> interface change into the public interface section as well?
>
> 3. Nit, can we avoid using the method name assignSplits here, given that
>
> it
>
> is not actually changing the split assignments? It seems something like
> pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <stevenz...@gmail.com> 
> <stevenz...@gmail.com> <
>
> stevenz...@gmail.com> wrote:
>
> However, a single source operator may read data from multiple
>
> splits/partitions, e.g., multiple Kafka partitions, such that even with
> watermark alignment the source operator may need to buffer excessive
>
> amount
>
> of data if one split emits data faster than another.
>
> For this part from the motivation section, is it accurate? Let's assume
>
> one
>
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow
>
> partition.
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
>
>
> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <t...@apache.org> 
> <t...@apache.org> <
>
> t...@apache.org> wrote:
>
> Thanks for working on this!
>
> I wonder if "supporting" split alignment in SourceReaderBase and then
>
> doing
>
> nothing if the split reader does not implement AlignedSplitReader could
>
> be
>
> misleading? Perhaps WithSplitsAlignment can instead be added to the
> specific source reader (i.e. KafkaSourceReader) to make it explicit that
> the source actually supports it.
>
> Thanks,
> Thomas
>
>
> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <kna...@apache.org> 
> <kna...@apache.org> <
>
> kna...@apache.org>
>
> wrote:
>
>
> Hi Sebastian, Hi Dawid,
>
> As part of this FLIP, the `AlignedSplitReader` interface (aka the stop
>
> &
>
> resume behavior) will be implemented for Kafka and Pulsar only,
>
> correct?
>
> +1 in general. I believe it is valuable to complete the watermark
>
> aligned
>
> story with this FLIP.
>
> Cheers,
>
> Konstantin
>
>
>
>
>
>
>
> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
> dwysakow...@apache.org>
>
> wrote:
>
>
> To be explicit, having worked on it, I support it ;) I think we can
> start a vote thread soonish, as there are no concerns so far.
>
> Best,
>
> Dawid
>
> On 13/04/2022 11:27, Sebastian Mattheis wrote:
>
> Dear Flink developers,
>
> I would like to open a discussion on FLIP 217 [1] for an extension
>
> of
>
> Watermark Alignment to perform alignment also in SplitReaders. To
>
> do
>
> so,
>
> SplitReaders must be able to suspend and resume reading from split
>
> sources
>
> where the SourceOperator coordinates and controlls suspend and
>
> resume.
>
> To
>
> gather information about current watermarks of the SplitReaders, we
>
> extend
>
> the internal WatermarkOutputMulitplexer and report watermarks to
>
> the
>
> SourceOperator.
>
> There is a PoC for this FLIP [2], prototyped by Arvid Heise and
>
> revised
>
> and
>
> reworked by Dawid Wysakowicz (He did most of the work.) and me. The
>
> changes
>
> are backwards compatible in a way that if affected components do
>
> not
>
> support split alignment the behavior is as before.
>
> Best,
> Sebastian
>
> [1]
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>
> [2] https://github.com/dawidwys/flink/tree/aligned-splits
>
> --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>

Reply via email to