Hi folks,

To move on with the FLIP, I will cut out eventTimeFetchLag out of scope and
go ahead with the remainder.

I will open a VOTE later to today.

Best,

Arvid

On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Becket,
>
> I have updated the PR according to your suggestion (note that this commit
> contains the removal of the previous approach) [1]. Here are my
> observations:
> 1. Adding the type of RecordMetadata to emitRecord would require adding
> another type parameter to RecordEmitter and SourceReaderBase. So I left
> that out for now as it would break things completely.
> 2. RecordEmitter implementations that want to pass it to SourceOutput need
> to be changed in a boilerplate fashion. (passing the metadata to the
> SourceOutput)
> 3. RecordMetadata as an interface (as in the commit) probably requires
> boilerplate implementations in using sources as well.
> 4. SourceOutput would also require an additional collect
>
> default void collect(T record, RecordMetadata metadata) {
>     collect(record, TimestampAssigner.NO_TIMESTAMP, metadata);
> }
>
> 5. RecordMetadata is currently not simplifying any code. By the current
> design RecordMetadata is a read-only data structure that is constant for
> all records in a batch. So in Kafka, we still need to pass Tuple3 because
> offset and timestamp are per record.
> 6. RecordMetadata is currently the same for all splits in
> RecordsWithSplitIds.
>
> Some ideas for the above points:
> 3. We should accompy it with a default implementation to avoid the trivial
> POJO implementations as the KafkaRecordMetadata of my commit. Can we skip
> the interface and just have RecordMetadata as a base class?
> 1.,2.,4. We could also set the metadata only once in an orthogonal method
> that need to be called before collect like SourceOutput#setRecordMetadata.
> Then we can implement it entirely in SourceReaderBase without changing any
> code. The clear downside is that it introduces some implicit state in
> SourceOutput (which we implement) and is harder to use in
> non-SourceReaderBase classes: Source devs need to remember to call
> setRecordMetadata before collect for a respective record.
> 6. We might rename and change the semantics into
>
> public interface RecordsWithSplitIds<E> {
>     /**
>      * Returns the record metadata. The metadata is shared for all records in 
> the current split.
>      */
>     @Nullable
>     default RecordMetadata metadataOfCurrentSplit() {
>         return null;
>     }
> ...
> }
>
>
> Re global variable
>
>> To explain a bit more on the metric being a global variable, I think in
>> general there are two ways to pass a value from one code block to another.
>> The first way is direct passing. That means the variable is explicitly
>> passed from one code block to another via arguments, be them in the
>> constructor or methods. Another way is indirect passing through context,
>> that means the information is stored in some kind of context or
>> environment, and everyone can have access to it. And there is no explicit
>> value passing from one code block to another because everyone just reads
>> from/writes to the context or environment. This is basically the "global
>> variable" pattern I am talking about.
>>
>> In general people would avoid having a mutable global value shared across
>> code blocks, because it is usually less deterministic and therefore more
>> difficult to understand or debug.
>>
> Since the first approach was using a Gauge, it's a callback and not a
> global value. The actual value is passed when invoking the callback. It's
> the same as a supplier. However, the gauge itself is stored in the context,
> so your argument holds on that level.
>
>
>> Moreover, generally speaking, the Metrics in systems are usually perceived
>> as a reporting mechanism. People usually think of it as a way to expose
>> some internal values to the external system, and don't expect the program
>> itself to read the reported values again in the main logic, which is
>> essentially using the MetricGroup as a context to pass values across code
>> block, i.e. the "global variable" pattern. Instead, people would usually
>> use the "direct passing" to do this.
>>
> Here I still don't see a difference on how we calculate the meter values
> from the byteIn/Out counters. We also need to read the counters
> periodically and calculate a secondary metric. So it can't be that
> unexpected to users.
>
> [1]
> https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b
>
> On Tue, Jul 27, 2021 at 3:19 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the patient discussion.
>>
>> To explain a bit more on the metric being a global variable, I think in
>> general there are two ways to pass a value from one code block to another.
>> The first way is direct passing. That means the variable is explicitly
>> passed from one code block to another via arguments, be them in the
>> constructor or methods. Another way is indirect passing through context,
>> that means the information is stored in some kind of context or
>> environment, and everyone can have access to it. And there is no explicit
>> value passing from one code block to another because everyone just reads
>> from/writes to the context or environment. This is basically the "global
>> variable" pattern I am talking about.
>>
>> In general people would avoid having a mutable global value shared across
>> code blocks, because it is usually less deterministic and therefore more
>> difficult to understand or debug.
>>
>> Moreover, generally speaking, the Metrics in systems are usually perceived
>> as a reporting mechanism. People usually think of it as a way to expose
>> some internal values to the external system, and don't expect the program
>> itself to read the reported values again in the main logic, which is
>> essentially using the MetricGroup as a context to pass values across code
>> block, i.e. the "global variable" pattern. Instead, people would usually
>> use the "direct passing" to do this.
>>
>> >Can we think of other use cases for the fetchTime parameter beyond
>> metrics
>> in the future? If so, it would make an even stronger case.
>> At this point, I cannot think of other use cases for fetchTime, but I can
>> see use cases where people want to get a per split fetch lag. So I am
>> wondering if it makes sense to generalize the API a little bit by
>> introducing collect(T Record, Long timestamp, Metadata metadata). This
>> also
>> makes a natural alignment because the RecordsWithSplitIds also returns a
>> Metadata associated with the record, which can be used by RecordEmitter as
>> well as the SourceOutput.
>>
>> What do you think?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise <ar...@apache.org> wrote:
>>
>> > Hi Becket,
>> >
>> > I still can't follow your view on the metric being a global variable or
>> > your concern that it is confusing to users. Nevertheless, I like your
>> > proposal with having an additional collect method.
>> >
>> > I was thinking that
>> > > SourceOutput is going to have an additional method of collect(T
>> Record,
>> > > Long timestamp, Long fetchTime). So people can just pass in the fetch
>> > time
>> > > directly when they emit a record, regardless of using
>> SourceReaderBase or
>> > > not.
>> > >
>> >
>> > Can we think of other use cases for the fetchTime parameter beyond
>> metrics
>> > in the future? If so, it would make an even stronger case.
>> >
>> > I'll update the PR with your proposals.
>> >
>> > Best,
>> >
>> > Arvid
>> >
>> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Regarding the generic type v.s. class/subclasses of Metadata.
>> > >
>> > > I think generic types usually make sense if the framework/abstract
>> class
>> > > itself does not look into the instances, but just pass them from one
>> user
>> > > logic to another. Otherwise, interfaces or class/subclasses would be
>> > > preferred.
>> > >
>> > > In our case, it depends on whether we expect the SourceReaderBase to
>> look
>> > > into the MetaData. At this point, it does not. But it seems possible
>> that
>> > > in the future it may look into MetaData. Therefore I think the class /
>> > > subclass pattern would be better.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin <becket....@gmail.com>
>> wrote:
>> > >
>> > > > Hi Arvid,
>> > > >
>> > > > > I'm not sure if I follow the global variable argument, could you
>> > > > elaborate? Are you referring specifically to the SettableGauge? How
>> is
>> > > that
>> > > > different from a Counter or Meter?
>> > > > What I meant is that the fetch lag computing logic can either get
>> the
>> > > > information required from method argument or something like a static
>> > > global
>> > > > variable. We are essentially trying to reuse the metric as a static
>> > > global
>> > > > variable. It seems not a common pattern in most systems. It is a
>> little
>> > > > counterintuitive that a gauge reported to the metric system would be
>> > used
>> > > > by the program main logic later on as a variable.
>> > > >
>> > > > > We could do that. That would remove the gauge from the
>> MetricGroup,
>> > > > right? The main downside is that sources that do not use
>> > SourceReaderBase
>> > > > cannot set the metric anymore. So I'd rather keep the current way
>> and
>> > > > extend it with the metadata extension.
>> > > > Yes, that would remove the gauge from the MetricGroup. I was
>> thinking
>> > > that
>> > > > SourceOutput is going to have an additional method of collect(T
>> Record,
>> > > > Long timestamp, Long fetchTime). So people can just pass in the
>> fetch
>> > > time
>> > > > directly when they emit a record, regardless of using
>> SourceReaderBase
>> > or
>> > > > not.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler <
>> ches...@apache.org>
>> > > > wrote:
>> > > >
>> > > >> The only histogram implementation available to use are those by
>> > > >> dropwizard, and they do some lock-free synchronization stuff that
>> so
>> > > far we
>> > > >> wanted to keep out of hot paths (this applis to both reading and
>> > > writing);
>> > > >> we have however never made benchmarks.
>> > > >> But it is reasonable to assume that they are way more expensive
>> than
>> > the
>> > > >> alternatives (in the ideal case just being a getter).
>> > > >> You'd pay that cost irrespective of whether a reporter is enabled
>> or
>> > > not,
>> > > >> which is another thing we so far wanted to prevent.
>> > > >> Finally, histograms are problematic because they are 10x more
>> > expensive
>> > > >> on the metric backend (because they are effectively 10 metrics),
>> and
>> > > should
>> > > >> be used with extreme caution, in particular because we lack any
>> switch
>> > > to
>> > > >> disable/enable metrics (and I think we are getting to a point where
>> > the
>> > > >> metric system becomes unusable for heavy users because of that,
>> where
>> > > >> another histogram isn't helping).
>> > > >>
>> > > >> Overall, at this time I'm against using Histograms.
>> > > >> Furthermore, I believe that we should be able to derive a Histogram
>> > from
>> > > >> that supplier if we later one decide differently. We'd just need to
>> > poll
>> > > >> the supplier more often.
>> > > >>
>> > > >> On 22/07/2021 09:36, Arvid Heise wrote:
>> > > >>
>> > > >> Hi all,
>> > > >>
>> > > >> @Steven Wu <stevenz...@gmail.com>
>> > > >>
>> > > >>> Regarding "lastFetchTime" latency metric, I found Gauge to be less
>> > > >>> informative as it only captures the last sampling value for each
>> > metric
>> > > >>> publish interval (e.g. 60s).
>> > > >>> * Can we make it a histogram? Histograms are more expensive
>> though.
>> > > >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg, count.
>> but
>> > > >>> there
>> > > >>> is no such metric type in Flink
>> > > >>> * Summary metric type [3] (from Prometheus) would be nice too
>> > > >>>
>> > > >> I'd also think that a histogram is much more expressive but the
>> > original
>> > > >> FLIP-33 decided against it because of it's cost. @Chesnay Schepler
>> > > >> <ches...@apache.org> could you shed some light on how much more
>> > > >> expensive it is in comparison to a simple gauge? Does it depend on
>> > > whether
>> > > >> a reporter is actually using the metric?
>> > > >> The current interface of this FLIP-179 would actually allow to
>> switch
>> > > the
>> > > >> type of the metric later. But since the metric type is
>> user-facing, we
>> > > need
>> > > >> to have an agreement now.
>> > > >>
>> > > >> @Becket Qin <becket....@gmail.com>
>> > > >>
>> > > >>> In that case, do we still need the metric here? It seems we are
>> > > creating
>> > > >>> a
>> > > >>> "global variable" which users may potentially use. I am wondering
>> how
>> > > >>> much
>> > > >>> additional convenience it provides because it seems easy for
>> people
>> > to
>> > > >>> simply pass the fetch time by themselves if they have decided to
>> not
>> > > use
>> > > >>> SourceReaderBase. Also, it looks like we do not have an API
>> pattern
>> > > that
>> > > >>> lets users get the value of a metric and derive another metric.
>> So I
>> > > >>> think
>> > > >>> it is easier for people to understand if LastFetchTimeGauge() is
>> just
>> > > an
>> > > >>> independent metric by itself, instead of being a part of the
>> > > >>> eventTimeFetchLag computation.
>> > > >>>
>> > > >> I'm not sure if I follow the global variable argument, could you
>> > > >> elaborate? Are you referring specifically to the SettableGauge?
>> How is
>> > > that
>> > > >> different from a Counter or Meter?
>> > > >>
>> > > >> With the current design, we could very well add a LastFetchTime
>> > metric.
>> > > >> The key point of the current abstraction is that a user gets the
>> much
>> > > >> harder eventTimeFetchLag metric for free, since we already need to
>> > > extract
>> > > >> the event time for other metrics. I think the JavaDoc makes it
>> clear
>> > > what
>> > > >> the intent of the LastFetchTimeGauge is and if not we can improve
>> it.
>> > > >> Btw we have derived metrics already. For example, we have Meters
>> for
>> > > >> byteIn/Out and recordIn/Out. That's already part of FLIP-33.
>> > > >>
>> > > >> Would it make sense to have a more generic metadata type <T>
>> > associated
>> > > >>> with the records batch? In some cases, it may be useful to allow
>> the
>> > > >>> Source
>> > > >>> implementation to carry some additional information of the batch
>> to
>> > the
>> > > >>> RecordEmitter. For example, the split info of the batch, the
>> sender
>> > of
>> > > >>> the
>> > > >>> batch etc. Because the RecordEmitter only takes one record at.a
>> time,
>> > > >>> currently such information needs to be put into each record, which
>> > may
>> > > >>> involve a lot of wrapper object creation.
>> > > >>>
>> > > >> I like the idea of having more general metadata and I follow the
>> > > example.
>> > > >> I'm wondering if we could avoid a generic type (since that adds a
>> bit
>> > of
>> > > >> complexity to the mental model and usage) by simply encouraging to
>> > use a
>> > > >> more specific MetaData subclass as a return type of the method.
>> > > >>
>> > > >> public interface RecordsWithSplitIds<E> {
>> > > >>     @Nullable    default RecordMetadata getMetadata() {
>> > > >>         return null;    }
>> > > >>     ...
>> > > >> }
>> > > >>
>> > > >> public interface RecordMetadata {
>> > > >>     long getLastFetchTime(); // mandatory?}
>> > > >>
>> > > >> And using it as
>> > > >>
>> > > >> public class KafkaRecordMetadata implements RecordMetadata {}
>> > > >>
>> > > >> private static class KafkaPartitionSplitRecords<T> implements
>> > > RecordsWithSplitIds<T> {
>> > > >>     @Override    public KafkaRecordMetadata getMetadata() {
>> > > >>         return metadata;    }
>> > > >> }
>> > > >>
>> > > >> Or do we want to have the generic to explicitly pass it to the
>> > > >> RecordEmitter? Would that metadata be a fourth parameter of
>> > > >> RecordEmitter#emitRecord?
>> > > >>
>> > > >> It might be slightly better if we let the method accept a Supplier
>> in
>> > > this
>> > > >>> case. However, it seems to introduce a parallel channel or a
>> sidepath
>> > > >>> between the user implementation and SourceOutput. I am not sure if
>> > this
>> > > >>> is
>> > > >>> the right way to go. Would it be more intuitive if we just add a
>> new
>> > > >>> method
>> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
>> > explicitly?
>> > > >>> This would work well with the change I suggested above, which
>> adds a
>> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
>> that to
>> > > the
>> > > >>> RecordEmitter.emitRecord() as an argument.
>> > > >>>
>> > > >>
>> > > >> We could do that. That would remove the gauge from the MetricGroup,
>> > > >> right? The main downside is that sources that do not use
>> > > SourceReaderBase
>> > > >> cannot set the metric anymore. So I'd rather keep the current way
>> and
>> > > >> extend it with the metadata extension.
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Arvid
>> > > >>
>> > > >>
>> > > >> On Wed, Jul 21, 2021 at 1:38 PM Becket Qin <becket....@gmail.com>
>> > > wrote:
>> > > >>
>> > > >>> Hey Chesnay,
>> > > >>>
>> > > >>> I think I got what that method was designed for now. Basically the
>> > > >>> motivation is to let the SourceOutput to report the
>> eventTimeFetchLag
>> > > for
>> > > >>> users. At this point, the SourceOutput only has the EventTime, so
>> > this
>> > > >>> method provides a way for the users to pass the FetchTime to the
>> > > >>> SourceOutput. This is essentially a context associated with each
>> > record
>> > > >>> emitted to the SourceOutput.
>> > > >>>
>> > > >>> It might be slightly better if we let the method accept a
>> Supplier in
>> > > >>> this
>> > > >>> case. However, it seems to introduce a parallel channel or a
>> sidepath
>> > > >>> between the user implementation and SourceOutput. I am not sure if
>> > this
>> > > >>> is
>> > > >>> the right way to go. Would it be more intuitive if we just add a
>> new
>> > > >>> method
>> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
>> > explicitly?
>> > > >>> This would work well with the change I suggested above, which
>> adds a
>> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
>> that to
>> > > the
>> > > >>> RecordEmitter.emitRecord() as an argument.
>> > > >>>
>> > > >>> What do you think?
>> > > >>>
>> > > >>> Thanks,
>> > > >>>
>> > > >>> Jiangjie (Becket) Qin
>> > > >>>
>> > > >>> On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler <
>> ches...@apache.org
>> > >
>> > > >>> wrote:
>> > > >>>
>> > > >>> > Would it be easier to understand if the method would accept a
>> > > Supplier
>> > > >>> > instead?
>> > > >>> >
>> > > >>> > On 20/07/2021 05:36, Becket Qin wrote:
>> > > >>> > > In that case, do we still need the metric here? It seems we
>> are
>> > > >>> creating
>> > > >>> > a
>> > > >>> > > "global variable" which users may potentially use. I am
>> wondering
>> > > how
>> > > >>> > much
>> > > >>> > > additional convenience it provides because it seems easy for
>> > people
>> > > >>> to
>> > > >>> > > simply pass the fetch time by themselves if they have decided
>> to
>> > > not
>> > > >>> use
>> > > >>> > > SourceReaderBase. Also, it looks like we do not have an API
>> > pattern
>> > > >>> that
>> > > >>> > > lets users get the value of a metric and derive another
>> metric.
>> > So
>> > > I
>> > > >>> > think
>> > > >>> > > it is easier for people to understand if LastFetchTimeGauge()
>> is
>> > > >>> just an
>> > > >>> > > independent metric by itself, instead of being a part of the
>> > > >>> > > eventTimeFetchLag computation.
>> > > >>> >
>> > > >>> >
>> > > >>> >
>> > > >>>
>> > > >>
>> > > >>
>> > >
>> >
>>
>

Reply via email to