I am trying to understand what those two metrics really capture

<G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge);

   -  use file source as an example, it captures the remaining bytes for
   the current file split that the reader is processing? How would users
   interpret or use this metric? enumerator keeps tracks of the
   pending/unassigned splits, which is an indication of the size of the
   backlog. that would be very useful


<G extends Gauge<Long>> G setPendingRecordsGauge(G pendingRecordsGauge);

   - In the Kafka source case, this is intended to capture the consumer lag
   (log head offset from broker - current record offset)? that could be used
   to capture the size of the backlog



On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Becket,
>
> I believe 1+2 has been answered by Chesnay already. Just to add to 2: I'm
> not the biggest fan of reusing task metrics but that's what FLIP-33 and
> different folks suggested. I'd probably keep task I/O metrics only for
> internal things and add a new metric for external calls. Then, we could
> even allow users to track I/O in AsyncIO (which would currently be a mess).
> However, with the current abstraction, it would be relatively easy to add
> separate metrics later.
>
> 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the user
> to implement it in a way that fetch time always corresponds to the latest
> polled record. For SourceReaderBase, I have added a new
> RecordsWithSplitIds#lastFetchTime (with default return value null) that
> sets the last fetch time automatically whenever the next batch is selected.
> Tbh this metric is a bit more challenging to implement for
> non-SourceReaderBase sources but I have not found a better, thread-safe
> way. Of course, we could shift the complete calculation into user-land but
> I'm not sure that this is easier.
> For your scenarios:
> - in A, you assume SourceReaderBase. In that case, we could eagerly report
> the metric as sketched by you. It depends on the definition of "last
> processed record" in FLIP-33, whether this eager reporting is more correct
> than the lazy reporting that I have proposed. The former case assumes "last
> processed record" = last fetched record, while the latter case assumes
> "last processed record" = "last polled record". For the proposed solution,
> the user would just need to implement RecordsWithSplitIds#lastFetchTime,
> which typically corresponds to the creation time of the RecordsWithSplitIds
> instance.
> - B is not assuming SourceReaderBase.
> If it's SourceReaderBase, the same proposed solution works out of the box:
> SourceOperator intercepts the emitted event time and uses the fetch time of
> the current batch.
> If it's not SourceReaderBase, the user would need to attach the timestamp
> to the handover protocol if multi-threaded and set the lastFetchTimeGauge
> when a value in the handover protocol is selected (typically a batch).
> If it's a single threaded source, the user could directly set the current
> timestamp after fetching the records in a sync fashion.
> The bad case is if the user is fetching individual records (either sync or
> async), then the fetch time would be updated with every record. However,
> I'm assuming that the required system call is dwarfed by involved I/O.
>
> [1] https://github.com/apache/flink/pull/15972
>
> On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
> > Re 1: We don't expose the reuse* methods, because the proposed
> > OperatorIOMetricGroup is a separate interface from the existing
> > implementations (which will be renamed and implement the new interface).
> >
> > Re 2: Currently the plan is to re-use the "new" numByesIn/Out counters
> > for tasks ("new" because all we are doing is exposing already existing
> > metrics). We may however change this in the future if we want to report
> > the byte metrics on an operator level, which is primarily interesting
> > for async IO or other external connectivity outside of sinks/sources.
> >
> > On 13/07/2021 12:38, Becket Qin wrote:
> > > Hi Arvid,
> > >
> > > Thanks for the proposal. I like the idea of exposing concrete metric
> > group
> > > class so that users can access the predefined metrics.
> > >
> > > A few questions are following:
> > >
> > > 1. When exposing the OperatorIOMetrics to the users, we are also
> exposing
> > > the reuseInputMetricsForTask to the users. Should we hide these two
> > methods
> > > because users won't have enough information to decide whether the
> records
> > > IO metrics should be reused by the task or not.
> > >
> > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are adding
> > > numBytesInCounter and numBytesOutCounter. Should these metrics be
> reusing
> > > the task level metrics by default?
> > >
> > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not sure
> how
> > > it works with the FetchLag. Typically there are two cases when
> reporting
> > > the fetch lag.
> > >      A. The EventTime is known at the point when the record is fetched
> in
> > > the SplitFetcher, so the fetch lag can be derived and reported
> > immediately.
> > >      B. The EventTime is known only after the fetched record was parsed
> > in
> > > the RecordEmitter. In this case, the RecordEmitter needs to get the
> fetch
> > > time of that particular record.
> > > I am not sure when users set the LastFetchTime in the above two cases.
> > Can
> > > you help elaborate on how users should use it?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > >> Dear devs,
> > >>
> > >> As a continuation and generalization of FLIP-33 (Standardize Connector
> > >> Metrics) [1], we'd like to discuss how we actually expose the
> > standardized
> > >> operator metrics to users in terms of changes to the API.
> > >>
> > >> Please check out the FLIP [2] and provide feedback.
> > >>
> > >> Best,
> > >>
> > >> Arvid
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >> [2]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > >>
> >
> >
>

Reply via email to