To avoid confusion, can we either rename "SourceMetricGroup" to "
SplitReaderMetricGroup" or add "Reader" to the setter method names?

Yes, we should  add the "unassigned/pending splits" enumerator metric. I
tried to publish those metrics for IcebergSourceEnumerator and ran into an
issue [1]. I don't want to distract the discussion with the jira ticket.

[1] https://issues.apache.org/jira/browse/FLINK-21000

On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Steven,
>
> The semantics are unchanged compared to FLIP-33 [1] but I see your point.
>
> In reality, pending records would be mostly for event storage systems
> (Kafka, Kinesis, ...). Here, we would report the consumer lag effectively.
> If consumer lag is more prominent, we could also rename it.
>
> For pending bytes, this is mostly related to file source or any kind of
> byte streams. At this point, we can only capture the assigned splits on
> reader levels. I don't think it makes sense to add the same metric to the
> enumerator as that might induce too much I/O on the job master. I could
> rather envision another metric that captures how many unassigned splits
> there are. In general, I think it would be a good idea to add another type
> of top-level metric group for SplitEnumerator called
> SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add
> unassigned/pending splits metric. WDYT?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
> On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <stevenz...@gmail.com> wrote:
>
> > I am trying to understand what those two metrics really capture
> >
> > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge);
> >
> >    -  use file source as an example, it captures the remaining bytes for
> >    the current file split that the reader is processing? How would users
> >    interpret or use this metric? enumerator keeps tracks of the
> >    pending/unassigned splits, which is an indication of the size of the
> >    backlog. that would be very useful
> >
> >
> > <G extends Gauge<Long>> G setPendingRecordsGauge(G pendingRecordsGauge);
> >
> >    - In the Kafka source case, this is intended to capture the consumer
> lag
> >    (log head offset from broker - current record offset)? that could be
> > used
> >    to capture the size of the backlog
> >
> >
> >
> > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Hi Becket,
> > >
> > > I believe 1+2 has been answered by Chesnay already. Just to add to 2:
> I'm
> > > not the biggest fan of reusing task metrics but that's what FLIP-33 and
> > > different folks suggested. I'd probably keep task I/O metrics only for
> > > internal things and add a new metric for external calls. Then, we could
> > > even allow users to track I/O in AsyncIO (which would currently be a
> > mess).
> > > However, with the current abstraction, it would be relatively easy to
> add
> > > separate metrics later.
> > >
> > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the
> > user
> > > to implement it in a way that fetch time always corresponds to the
> latest
> > > polled record. For SourceReaderBase, I have added a new
> > > RecordsWithSplitIds#lastFetchTime (with default return value null) that
> > > sets the last fetch time automatically whenever the next batch is
> > selected.
> > > Tbh this metric is a bit more challenging to implement for
> > > non-SourceReaderBase sources but I have not found a better, thread-safe
> > > way. Of course, we could shift the complete calculation into user-land
> > but
> > > I'm not sure that this is easier.
> > > For your scenarios:
> > > - in A, you assume SourceReaderBase. In that case, we could eagerly
> > report
> > > the metric as sketched by you. It depends on the definition of "last
> > > processed record" in FLIP-33, whether this eager reporting is more
> > correct
> > > than the lazy reporting that I have proposed. The former case assumes
> > "last
> > > processed record" = last fetched record, while the latter case assumes
> > > "last processed record" = "last polled record". For the proposed
> > solution,
> > > the user would just need to implement
> RecordsWithSplitIds#lastFetchTime,
> > > which typically corresponds to the creation time of the
> > RecordsWithSplitIds
> > > instance.
> > > - B is not assuming SourceReaderBase.
> > > If it's SourceReaderBase, the same proposed solution works out of the
> > box:
> > > SourceOperator intercepts the emitted event time and uses the fetch
> time
> > of
> > > the current batch.
> > > If it's not SourceReaderBase, the user would need to attach the
> timestamp
> > > to the handover protocol if multi-threaded and set the
> lastFetchTimeGauge
> > > when a value in the handover protocol is selected (typically a batch).
> > > If it's a single threaded source, the user could directly set the
> current
> > > timestamp after fetching the records in a sync fashion.
> > > The bad case is if the user is fetching individual records (either sync
> > or
> > > async), then the fetch time would be updated with every record.
> However,
> > > I'm assuming that the required system call is dwarfed by involved I/O.
> > >
> > > [1] https://github.com/apache/flink/pull/15972
> > >
> > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <ches...@apache.org>
> > > wrote:
> > >
> > > > Re 1: We don't expose the reuse* methods, because the proposed
> > > > OperatorIOMetricGroup is a separate interface from the existing
> > > > implementations (which will be renamed and implement the new
> > interface).
> > > >
> > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out
> counters
> > > > for tasks ("new" because all we are doing is exposing already
> existing
> > > > metrics). We may however change this in the future if we want to
> report
> > > > the byte metrics on an operator level, which is primarily interesting
> > > > for async IO or other external connectivity outside of sinks/sources.
> > > >
> > > > On 13/07/2021 12:38, Becket Qin wrote:
> > > > > Hi Arvid,
> > > > >
> > > > > Thanks for the proposal. I like the idea of exposing concrete
> metric
> > > > group
> > > > > class so that users can access the predefined metrics.
> > > > >
> > > > > A few questions are following:
> > > > >
> > > > > 1. When exposing the OperatorIOMetrics to the users, we are also
> > > exposing
> > > > > the reuseInputMetricsForTask to the users. Should we hide these two
> > > > methods
> > > > > because users won't have enough information to decide whether the
> > > records
> > > > > IO metrics should be reused by the task or not.
> > > > >
> > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are
> adding
> > > > > numBytesInCounter and numBytesOutCounter. Should these metrics be
> > > reusing
> > > > > the task level metrics by default?
> > > > >
> > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not
> sure
> > > how
> > > > > it works with the FetchLag. Typically there are two cases when
> > > reporting
> > > > > the fetch lag.
> > > > >      A. The EventTime is known at the point when the record is
> > fetched
> > > in
> > > > > the SplitFetcher, so the fetch lag can be derived and reported
> > > > immediately.
> > > > >      B. The EventTime is known only after the fetched record was
> > parsed
> > > > in
> > > > > the RecordEmitter. In this case, the RecordEmitter needs to get the
> > > fetch
> > > > > time of that particular record.
> > > > > I am not sure when users set the LastFetchTime in the above two
> > cases.
> > > > Can
> > > > > you help elaborate on how users should use it?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > >> Dear devs,
> > > > >>
> > > > >> As a continuation and generalization of FLIP-33 (Standardize
> > Connector
> > > > >> Metrics) [1], we'd like to discuss how we actually expose the
> > > > standardized
> > > > >> operator metrics to users in terms of changes to the API.
> > > > >>
> > > > >> Please check out the FLIP [2] and provide feedback.
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> Arvid
> > > > >>
> > > > >> [1]
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > >> [2]
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to