Hi folks, To move on with the FLIP, I will cut out eventTimeFetchLag out of scope and go ahead with the remainder.
I will open a VOTE later to today. Best, Arvid On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise <ar...@apache.org> wrote: > Hi Becket, > > I have updated the PR according to your suggestion (note that this commit > contains the removal of the previous approach) [1]. Here are my > observations: > 1. Adding the type of RecordMetadata to emitRecord would require adding > another type parameter to RecordEmitter and SourceReaderBase. So I left > that out for now as it would break things completely. > 2. RecordEmitter implementations that want to pass it to SourceOutput need > to be changed in a boilerplate fashion. (passing the metadata to the > SourceOutput) > 3. RecordMetadata as an interface (as in the commit) probably requires > boilerplate implementations in using sources as well. > 4. SourceOutput would also require an additional collect > > default void collect(T record, RecordMetadata metadata) { > collect(record, TimestampAssigner.NO_TIMESTAMP, metadata); > } > > 5. RecordMetadata is currently not simplifying any code. By the current > design RecordMetadata is a read-only data structure that is constant for > all records in a batch. So in Kafka, we still need to pass Tuple3 because > offset and timestamp are per record. > 6. RecordMetadata is currently the same for all splits in > RecordsWithSplitIds. > > Some ideas for the above points: > 3. We should accompy it with a default implementation to avoid the trivial > POJO implementations as the KafkaRecordMetadata of my commit. Can we skip > the interface and just have RecordMetadata as a base class? > 1.,2.,4. We could also set the metadata only once in an orthogonal method > that need to be called before collect like SourceOutput#setRecordMetadata. > Then we can implement it entirely in SourceReaderBase without changing any > code. The clear downside is that it introduces some implicit state in > SourceOutput (which we implement) and is harder to use in > non-SourceReaderBase classes: Source devs need to remember to call > setRecordMetadata before collect for a respective record. > 6. We might rename and change the semantics into > > public interface RecordsWithSplitIds<E> { > /** > * Returns the record metadata. The metadata is shared for all records in > the current split. > */ > @Nullable > default RecordMetadata metadataOfCurrentSplit() { > return null; > } > ... > } > > > Re global variable > >> To explain a bit more on the metric being a global variable, I think in >> general there are two ways to pass a value from one code block to another. >> The first way is direct passing. That means the variable is explicitly >> passed from one code block to another via arguments, be them in the >> constructor or methods. Another way is indirect passing through context, >> that means the information is stored in some kind of context or >> environment, and everyone can have access to it. And there is no explicit >> value passing from one code block to another because everyone just reads >> from/writes to the context or environment. This is basically the "global >> variable" pattern I am talking about. >> >> In general people would avoid having a mutable global value shared across >> code blocks, because it is usually less deterministic and therefore more >> difficult to understand or debug. >> > Since the first approach was using a Gauge, it's a callback and not a > global value. The actual value is passed when invoking the callback. It's > the same as a supplier. However, the gauge itself is stored in the context, > so your argument holds on that level. > > >> Moreover, generally speaking, the Metrics in systems are usually perceived >> as a reporting mechanism. People usually think of it as a way to expose >> some internal values to the external system, and don't expect the program >> itself to read the reported values again in the main logic, which is >> essentially using the MetricGroup as a context to pass values across code >> block, i.e. the "global variable" pattern. Instead, people would usually >> use the "direct passing" to do this. >> > Here I still don't see a difference on how we calculate the meter values > from the byteIn/Out counters. We also need to read the counters > periodically and calculate a secondary metric. So it can't be that > unexpected to users. > > [1] > https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b > > On Tue, Jul 27, 2021 at 3:19 AM Becket Qin <becket....@gmail.com> wrote: > >> Hi Arvid, >> >> Thanks for the patient discussion. >> >> To explain a bit more on the metric being a global variable, I think in >> general there are two ways to pass a value from one code block to another. >> The first way is direct passing. That means the variable is explicitly >> passed from one code block to another via arguments, be them in the >> constructor or methods. Another way is indirect passing through context, >> that means the information is stored in some kind of context or >> environment, and everyone can have access to it. And there is no explicit >> value passing from one code block to another because everyone just reads >> from/writes to the context or environment. This is basically the "global >> variable" pattern I am talking about. >> >> In general people would avoid having a mutable global value shared across >> code blocks, because it is usually less deterministic and therefore more >> difficult to understand or debug. >> >> Moreover, generally speaking, the Metrics in systems are usually perceived >> as a reporting mechanism. People usually think of it as a way to expose >> some internal values to the external system, and don't expect the program >> itself to read the reported values again in the main logic, which is >> essentially using the MetricGroup as a context to pass values across code >> block, i.e. the "global variable" pattern. Instead, people would usually >> use the "direct passing" to do this. >> >> >Can we think of other use cases for the fetchTime parameter beyond >> metrics >> in the future? If so, it would make an even stronger case. >> At this point, I cannot think of other use cases for fetchTime, but I can >> see use cases where people want to get a per split fetch lag. So I am >> wondering if it makes sense to generalize the API a little bit by >> introducing collect(T Record, Long timestamp, Metadata metadata). This >> also >> makes a natural alignment because the RecordsWithSplitIds also returns a >> Metadata associated with the record, which can be used by RecordEmitter as >> well as the SourceOutput. >> >> What do you think? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise <ar...@apache.org> wrote: >> >> > Hi Becket, >> > >> > I still can't follow your view on the metric being a global variable or >> > your concern that it is confusing to users. Nevertheless, I like your >> > proposal with having an additional collect method. >> > >> > I was thinking that >> > > SourceOutput is going to have an additional method of collect(T >> Record, >> > > Long timestamp, Long fetchTime). So people can just pass in the fetch >> > time >> > > directly when they emit a record, regardless of using >> SourceReaderBase or >> > > not. >> > > >> > >> > Can we think of other use cases for the fetchTime parameter beyond >> metrics >> > in the future? If so, it would make an even stronger case. >> > >> > I'll update the PR with your proposals. >> > >> > Best, >> > >> > Arvid >> > >> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > Regarding the generic type v.s. class/subclasses of Metadata. >> > > >> > > I think generic types usually make sense if the framework/abstract >> class >> > > itself does not look into the instances, but just pass them from one >> user >> > > logic to another. Otherwise, interfaces or class/subclasses would be >> > > preferred. >> > > >> > > In our case, it depends on whether we expect the SourceReaderBase to >> look >> > > into the MetaData. At this point, it does not. But it seems possible >> that >> > > in the future it may look into MetaData. Therefore I think the class / >> > > subclass pattern would be better. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin <becket....@gmail.com> >> wrote: >> > > >> > > > Hi Arvid, >> > > > >> > > > > I'm not sure if I follow the global variable argument, could you >> > > > elaborate? Are you referring specifically to the SettableGauge? How >> is >> > > that >> > > > different from a Counter or Meter? >> > > > What I meant is that the fetch lag computing logic can either get >> the >> > > > information required from method argument or something like a static >> > > global >> > > > variable. We are essentially trying to reuse the metric as a static >> > > global >> > > > variable. It seems not a common pattern in most systems. It is a >> little >> > > > counterintuitive that a gauge reported to the metric system would be >> > used >> > > > by the program main logic later on as a variable. >> > > > >> > > > > We could do that. That would remove the gauge from the >> MetricGroup, >> > > > right? The main downside is that sources that do not use >> > SourceReaderBase >> > > > cannot set the metric anymore. So I'd rather keep the current way >> and >> > > > extend it with the metadata extension. >> > > > Yes, that would remove the gauge from the MetricGroup. I was >> thinking >> > > that >> > > > SourceOutput is going to have an additional method of collect(T >> Record, >> > > > Long timestamp, Long fetchTime). So people can just pass in the >> fetch >> > > time >> > > > directly when they emit a record, regardless of using >> SourceReaderBase >> > or >> > > > not. >> > > > >> > > > Thanks, >> > > > >> > > > Jiangjie (Becket) Qin >> > > > >> > > > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler < >> ches...@apache.org> >> > > > wrote: >> > > > >> > > >> The only histogram implementation available to use are those by >> > > >> dropwizard, and they do some lock-free synchronization stuff that >> so >> > > far we >> > > >> wanted to keep out of hot paths (this applis to both reading and >> > > writing); >> > > >> we have however never made benchmarks. >> > > >> But it is reasonable to assume that they are way more expensive >> than >> > the >> > > >> alternatives (in the ideal case just being a getter). >> > > >> You'd pay that cost irrespective of whether a reporter is enabled >> or >> > > not, >> > > >> which is another thing we so far wanted to prevent. >> > > >> Finally, histograms are problematic because they are 10x more >> > expensive >> > > >> on the metric backend (because they are effectively 10 metrics), >> and >> > > should >> > > >> be used with extreme caution, in particular because we lack any >> switch >> > > to >> > > >> disable/enable metrics (and I think we are getting to a point where >> > the >> > > >> metric system becomes unusable for heavy users because of that, >> where >> > > >> another histogram isn't helping). >> > > >> >> > > >> Overall, at this time I'm against using Histograms. >> > > >> Furthermore, I believe that we should be able to derive a Histogram >> > from >> > > >> that supplier if we later one decide differently. We'd just need to >> > poll >> > > >> the supplier more often. >> > > >> >> > > >> On 22/07/2021 09:36, Arvid Heise wrote: >> > > >> >> > > >> Hi all, >> > > >> >> > > >> @Steven Wu <stevenz...@gmail.com> >> > > >> >> > > >>> Regarding "lastFetchTime" latency metric, I found Gauge to be less >> > > >>> informative as it only captures the last sampling value for each >> > metric >> > > >>> publish interval (e.g. 60s). >> > > >>> * Can we make it a histogram? Histograms are more expensive >> though. >> > > >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg, count. >> but >> > > >>> there >> > > >>> is no such metric type in Flink >> > > >>> * Summary metric type [3] (from Prometheus) would be nice too >> > > >>> >> > > >> I'd also think that a histogram is much more expressive but the >> > original >> > > >> FLIP-33 decided against it because of it's cost. @Chesnay Schepler >> > > >> <ches...@apache.org> could you shed some light on how much more >> > > >> expensive it is in comparison to a simple gauge? Does it depend on >> > > whether >> > > >> a reporter is actually using the metric? >> > > >> The current interface of this FLIP-179 would actually allow to >> switch >> > > the >> > > >> type of the metric later. But since the metric type is >> user-facing, we >> > > need >> > > >> to have an agreement now. >> > > >> >> > > >> @Becket Qin <becket....@gmail.com> >> > > >> >> > > >>> In that case, do we still need the metric here? It seems we are >> > > creating >> > > >>> a >> > > >>> "global variable" which users may potentially use. I am wondering >> how >> > > >>> much >> > > >>> additional convenience it provides because it seems easy for >> people >> > to >> > > >>> simply pass the fetch time by themselves if they have decided to >> not >> > > use >> > > >>> SourceReaderBase. Also, it looks like we do not have an API >> pattern >> > > that >> > > >>> lets users get the value of a metric and derive another metric. >> So I >> > > >>> think >> > > >>> it is easier for people to understand if LastFetchTimeGauge() is >> just >> > > an >> > > >>> independent metric by itself, instead of being a part of the >> > > >>> eventTimeFetchLag computation. >> > > >>> >> > > >> I'm not sure if I follow the global variable argument, could you >> > > >> elaborate? Are you referring specifically to the SettableGauge? >> How is >> > > that >> > > >> different from a Counter or Meter? >> > > >> >> > > >> With the current design, we could very well add a LastFetchTime >> > metric. >> > > >> The key point of the current abstraction is that a user gets the >> much >> > > >> harder eventTimeFetchLag metric for free, since we already need to >> > > extract >> > > >> the event time for other metrics. I think the JavaDoc makes it >> clear >> > > what >> > > >> the intent of the LastFetchTimeGauge is and if not we can improve >> it. >> > > >> Btw we have derived metrics already. For example, we have Meters >> for >> > > >> byteIn/Out and recordIn/Out. That's already part of FLIP-33. >> > > >> >> > > >> Would it make sense to have a more generic metadata type <T> >> > associated >> > > >>> with the records batch? In some cases, it may be useful to allow >> the >> > > >>> Source >> > > >>> implementation to carry some additional information of the batch >> to >> > the >> > > >>> RecordEmitter. For example, the split info of the batch, the >> sender >> > of >> > > >>> the >> > > >>> batch etc. Because the RecordEmitter only takes one record at.a >> time, >> > > >>> currently such information needs to be put into each record, which >> > may >> > > >>> involve a lot of wrapper object creation. >> > > >>> >> > > >> I like the idea of having more general metadata and I follow the >> > > example. >> > > >> I'm wondering if we could avoid a generic type (since that adds a >> bit >> > of >> > > >> complexity to the mental model and usage) by simply encouraging to >> > use a >> > > >> more specific MetaData subclass as a return type of the method. >> > > >> >> > > >> public interface RecordsWithSplitIds<E> { >> > > >> @Nullable default RecordMetadata getMetadata() { >> > > >> return null; } >> > > >> ... >> > > >> } >> > > >> >> > > >> public interface RecordMetadata { >> > > >> long getLastFetchTime(); // mandatory?} >> > > >> >> > > >> And using it as >> > > >> >> > > >> public class KafkaRecordMetadata implements RecordMetadata {} >> > > >> >> > > >> private static class KafkaPartitionSplitRecords<T> implements >> > > RecordsWithSplitIds<T> { >> > > >> @Override public KafkaRecordMetadata getMetadata() { >> > > >> return metadata; } >> > > >> } >> > > >> >> > > >> Or do we want to have the generic to explicitly pass it to the >> > > >> RecordEmitter? Would that metadata be a fourth parameter of >> > > >> RecordEmitter#emitRecord? >> > > >> >> > > >> It might be slightly better if we let the method accept a Supplier >> in >> > > this >> > > >>> case. However, it seems to introduce a parallel channel or a >> sidepath >> > > >>> between the user implementation and SourceOutput. I am not sure if >> > this >> > > >>> is >> > > >>> the right way to go. Would it be more intuitive if we just add a >> new >> > > >>> method >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in >> > explicitly? >> > > >>> This would work well with the change I suggested above, which >> adds a >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes >> that to >> > > the >> > > >>> RecordEmitter.emitRecord() as an argument. >> > > >>> >> > > >> >> > > >> We could do that. That would remove the gauge from the MetricGroup, >> > > >> right? The main downside is that sources that do not use >> > > SourceReaderBase >> > > >> cannot set the metric anymore. So I'd rather keep the current way >> and >> > > >> extend it with the metadata extension. >> > > >> >> > > >> Best, >> > > >> >> > > >> Arvid >> > > >> >> > > >> >> > > >> On Wed, Jul 21, 2021 at 1:38 PM Becket Qin <becket....@gmail.com> >> > > wrote: >> > > >> >> > > >>> Hey Chesnay, >> > > >>> >> > > >>> I think I got what that method was designed for now. Basically the >> > > >>> motivation is to let the SourceOutput to report the >> eventTimeFetchLag >> > > for >> > > >>> users. At this point, the SourceOutput only has the EventTime, so >> > this >> > > >>> method provides a way for the users to pass the FetchTime to the >> > > >>> SourceOutput. This is essentially a context associated with each >> > record >> > > >>> emitted to the SourceOutput. >> > > >>> >> > > >>> It might be slightly better if we let the method accept a >> Supplier in >> > > >>> this >> > > >>> case. However, it seems to introduce a parallel channel or a >> sidepath >> > > >>> between the user implementation and SourceOutput. I am not sure if >> > this >> > > >>> is >> > > >>> the right way to go. Would it be more intuitive if we just add a >> new >> > > >>> method >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in >> > explicitly? >> > > >>> This would work well with the change I suggested above, which >> adds a >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes >> that to >> > > the >> > > >>> RecordEmitter.emitRecord() as an argument. >> > > >>> >> > > >>> What do you think? >> > > >>> >> > > >>> Thanks, >> > > >>> >> > > >>> Jiangjie (Becket) Qin >> > > >>> >> > > >>> On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler < >> ches...@apache.org >> > > >> > > >>> wrote: >> > > >>> >> > > >>> > Would it be easier to understand if the method would accept a >> > > Supplier >> > > >>> > instead? >> > > >>> > >> > > >>> > On 20/07/2021 05:36, Becket Qin wrote: >> > > >>> > > In that case, do we still need the metric here? It seems we >> are >> > > >>> creating >> > > >>> > a >> > > >>> > > "global variable" which users may potentially use. I am >> wondering >> > > how >> > > >>> > much >> > > >>> > > additional convenience it provides because it seems easy for >> > people >> > > >>> to >> > > >>> > > simply pass the fetch time by themselves if they have decided >> to >> > > not >> > > >>> use >> > > >>> > > SourceReaderBase. Also, it looks like we do not have an API >> > pattern >> > > >>> that >> > > >>> > > lets users get the value of a metric and derive another >> metric. >> > So >> > > I >> > > >>> > think >> > > >>> > > it is easier for people to understand if LastFetchTimeGauge() >> is >> > > >>> just an >> > > >>> > > independent metric by itself, instead of being a part of the >> > > >>> > > eventTimeFetchLag computation. >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> >> > > >> >> > > >> >> > > >> > >> >