Hi Steven, I extended the FLIP and its draft PR to have a SourceReaderMetricGroup and a SplitEnumeratorMetricGroup. I hope that it makes it clearer. I'd like to address FLINK-21000 as part of the implementation but I'd keep it out of the FLIP discussion.
Question: should we rename SinkMetricGroup to SinkWriterMetricGroup? I can see the same confusion arising on sink side. I have added a commit to the draft PR (not updated FLIP yet). Btw I'd like to start the vote soonish. @Becket Qin <[email protected]> are you okay with the setLastFetchTimeGauge explanation or do you have alternative ideas? Best, Arvid On Fri, Jul 16, 2021 at 8:13 PM Steven Wu <[email protected]> wrote: > To avoid confusion, can we either rename "SourceMetricGroup" to " > SplitReaderMetricGroup" or add "Reader" to the setter method names? > > Yes, we should add the "unassigned/pending splits" enumerator metric. I > tried to publish those metrics for IcebergSourceEnumerator and ran into an > issue [1]. I don't want to distract the discussion with the jira ticket. > > [1] https://issues.apache.org/jira/browse/FLINK-21000 > > On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <[email protected]> wrote: > > > Hi Steven, > > > > The semantics are unchanged compared to FLIP-33 [1] but I see your point. > > > > In reality, pending records would be mostly for event storage systems > > (Kafka, Kinesis, ...). Here, we would report the consumer lag > effectively. > > If consumer lag is more prominent, we could also rename it. > > > > For pending bytes, this is mostly related to file source or any kind of > > byte streams. At this point, we can only capture the assigned splits on > > reader levels. I don't think it makes sense to add the same metric to the > > enumerator as that might induce too much I/O on the job master. I could > > rather envision another metric that captures how many unassigned splits > > there are. In general, I think it would be a good idea to add another > type > > of top-level metric group for SplitEnumerator called > > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add > > unassigned/pending splits metric. WDYT? > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <[email protected]> wrote: > > > > > I am trying to understand what those two metrics really capture > > > > > > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge); > > > > > > - use file source as an example, it captures the remaining bytes > for > > > the current file split that the reader is processing? How would > users > > > interpret or use this metric? enumerator keeps tracks of the > > > pending/unassigned splits, which is an indication of the size of the > > > backlog. that would be very useful > > > > > > > > > <G extends Gauge<Long>> G setPendingRecordsGauge(G > pendingRecordsGauge); > > > > > > - In the Kafka source case, this is intended to capture the consumer > > lag > > > (log head offset from broker - current record offset)? that could be > > > used > > > to capture the size of the backlog > > > > > > > > > > > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <[email protected]> wrote: > > > > > > > Hi Becket, > > > > > > > > I believe 1+2 has been answered by Chesnay already. Just to add to 2: > > I'm > > > > not the biggest fan of reusing task metrics but that's what FLIP-33 > and > > > > different folks suggested. I'd probably keep task I/O metrics only > for > > > > internal things and add a new metric for external calls. Then, we > could > > > > even allow users to track I/O in AsyncIO (which would currently be a > > > mess). > > > > However, with the current abstraction, it would be relatively easy to > > add > > > > separate metrics later. > > > > > > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the > > > user > > > > to implement it in a way that fetch time always corresponds to the > > latest > > > > polled record. For SourceReaderBase, I have added a new > > > > RecordsWithSplitIds#lastFetchTime (with default return value null) > that > > > > sets the last fetch time automatically whenever the next batch is > > > selected. > > > > Tbh this metric is a bit more challenging to implement for > > > > non-SourceReaderBase sources but I have not found a better, > thread-safe > > > > way. Of course, we could shift the complete calculation into > user-land > > > but > > > > I'm not sure that this is easier. > > > > For your scenarios: > > > > - in A, you assume SourceReaderBase. In that case, we could eagerly > > > report > > > > the metric as sketched by you. It depends on the definition of "last > > > > processed record" in FLIP-33, whether this eager reporting is more > > > correct > > > > than the lazy reporting that I have proposed. The former case assumes > > > "last > > > > processed record" = last fetched record, while the latter case > assumes > > > > "last processed record" = "last polled record". For the proposed > > > solution, > > > > the user would just need to implement > > RecordsWithSplitIds#lastFetchTime, > > > > which typically corresponds to the creation time of the > > > RecordsWithSplitIds > > > > instance. > > > > - B is not assuming SourceReaderBase. > > > > If it's SourceReaderBase, the same proposed solution works out of the > > > box: > > > > SourceOperator intercepts the emitted event time and uses the fetch > > time > > > of > > > > the current batch. > > > > If it's not SourceReaderBase, the user would need to attach the > > timestamp > > > > to the handover protocol if multi-threaded and set the > > lastFetchTimeGauge > > > > when a value in the handover protocol is selected (typically a > batch). > > > > If it's a single threaded source, the user could directly set the > > current > > > > timestamp after fetching the records in a sync fashion. > > > > The bad case is if the user is fetching individual records (either > sync > > > or > > > > async), then the fetch time would be updated with every record. > > However, > > > > I'm assuming that the required system call is dwarfed by involved > I/O. > > > > > > > > [1] https://github.com/apache/flink/pull/15972 > > > > > > > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler < > [email protected]> > > > > wrote: > > > > > > > > > Re 1: We don't expose the reuse* methods, because the proposed > > > > > OperatorIOMetricGroup is a separate interface from the existing > > > > > implementations (which will be renamed and implement the new > > > interface). > > > > > > > > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out > > counters > > > > > for tasks ("new" because all we are doing is exposing already > > existing > > > > > metrics). We may however change this in the future if we want to > > report > > > > > the byte metrics on an operator level, which is primarily > interesting > > > > > for async IO or other external connectivity outside of > sinks/sources. > > > > > > > > > > On 13/07/2021 12:38, Becket Qin wrote: > > > > > > Hi Arvid, > > > > > > > > > > > > Thanks for the proposal. I like the idea of exposing concrete > > metric > > > > > group > > > > > > class so that users can access the predefined metrics. > > > > > > > > > > > > A few questions are following: > > > > > > > > > > > > 1. When exposing the OperatorIOMetrics to the users, we are also > > > > exposing > > > > > > the reuseInputMetricsForTask to the users. Should we hide these > two > > > > > methods > > > > > > because users won't have enough information to decide whether the > > > > records > > > > > > IO metrics should be reused by the task or not. > > > > > > > > > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are > > adding > > > > > > numBytesInCounter and numBytesOutCounter. Should these metrics be > > > > reusing > > > > > > the task level metrics by default? > > > > > > > > > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not > > sure > > > > how > > > > > > it works with the FetchLag. Typically there are two cases when > > > > reporting > > > > > > the fetch lag. > > > > > > A. The EventTime is known at the point when the record is > > > fetched > > > > in > > > > > > the SplitFetcher, so the fetch lag can be derived and reported > > > > > immediately. > > > > > > B. The EventTime is known only after the fetched record was > > > parsed > > > > > in > > > > > > the RecordEmitter. In this case, the RecordEmitter needs to get > the > > > > fetch > > > > > > time of that particular record. > > > > > > I am not sure when users set the LastFetchTime in the above two > > > cases. > > > > > Can > > > > > > you help elaborate on how users should use it? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <[email protected]> > > > wrote: > > > > > > > > > > > >> Dear devs, > > > > > >> > > > > > >> As a continuation and generalization of FLIP-33 (Standardize > > > Connector > > > > > >> Metrics) [1], we'd like to discuss how we actually expose the > > > > > standardized > > > > > >> operator metrics to users in terms of changes to the API. > > > > > >> > > > > > >> Please check out the FLIP [2] and provide feedback. > > > > > >> > > > > > >> Best, > > > > > >> > > > > > >> Arvid > > > > > >> > > > > > >> [1] > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > >> [2] > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics > > > > > >> > > > > > > > > > > > > > > > > > > > >
