Hi Steven,

I extended the FLIP and its draft PR to have a SourceReaderMetricGroup and
a SplitEnumeratorMetricGroup. I hope that it makes it clearer.
I'd like to address FLINK-21000 as part of the implementation but I'd keep
it out of the FLIP discussion.

Question: should we rename SinkMetricGroup to SinkWriterMetricGroup? I can
see the same confusion arising on sink side. I have added a commit to the
draft PR (not updated FLIP yet).

Btw I'd like to start the vote soonish. @Becket Qin <becket....@gmail.com>
are you okay with the setLastFetchTimeGauge explanation or do you have
alternative ideas?

Best,

Arvid

On Fri, Jul 16, 2021 at 8:13 PM Steven Wu <stevenz...@gmail.com> wrote:

> To avoid confusion, can we either rename "SourceMetricGroup" to "
> SplitReaderMetricGroup" or add "Reader" to the setter method names?
>
> Yes, we should  add the "unassigned/pending splits" enumerator metric. I
> tried to publish those metrics for IcebergSourceEnumerator and ran into an
> issue [1]. I don't want to distract the discussion with the jira ticket.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21000
>
> On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi Steven,
> >
> > The semantics are unchanged compared to FLIP-33 [1] but I see your point.
> >
> > In reality, pending records would be mostly for event storage systems
> > (Kafka, Kinesis, ...). Here, we would report the consumer lag
> effectively.
> > If consumer lag is more prominent, we could also rename it.
> >
> > For pending bytes, this is mostly related to file source or any kind of
> > byte streams. At this point, we can only capture the assigned splits on
> > reader levels. I don't think it makes sense to add the same metric to the
> > enumerator as that might induce too much I/O on the job master. I could
> > rather envision another metric that captures how many unassigned splits
> > there are. In general, I think it would be a good idea to add another
> type
> > of top-level metric group for SplitEnumerator called
> > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add
> > unassigned/pending splits metric. WDYT?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <stevenz...@gmail.com> wrote:
> >
> > > I am trying to understand what those two metrics really capture
> > >
> > > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge);
> > >
> > >    -  use file source as an example, it captures the remaining bytes
> for
> > >    the current file split that the reader is processing? How would
> users
> > >    interpret or use this metric? enumerator keeps tracks of the
> > >    pending/unassigned splits, which is an indication of the size of the
> > >    backlog. that would be very useful
> > >
> > >
> > > <G extends Gauge<Long>> G setPendingRecordsGauge(G
> pendingRecordsGauge);
> > >
> > >    - In the Kafka source case, this is intended to capture the consumer
> > lag
> > >    (log head offset from broker - current record offset)? that could be
> > > used
> > >    to capture the size of the backlog
> > >
> > >
> > >
> > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > I believe 1+2 has been answered by Chesnay already. Just to add to 2:
> > I'm
> > > > not the biggest fan of reusing task metrics but that's what FLIP-33
> and
> > > > different folks suggested. I'd probably keep task I/O metrics only
> for
> > > > internal things and add a new metric for external calls. Then, we
> could
> > > > even allow users to track I/O in AsyncIO (which would currently be a
> > > mess).
> > > > However, with the current abstraction, it would be relatively easy to
> > add
> > > > separate metrics later.
> > > >
> > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the
> > > user
> > > > to implement it in a way that fetch time always corresponds to the
> > latest
> > > > polled record. For SourceReaderBase, I have added a new
> > > > RecordsWithSplitIds#lastFetchTime (with default return value null)
> that
> > > > sets the last fetch time automatically whenever the next batch is
> > > selected.
> > > > Tbh this metric is a bit more challenging to implement for
> > > > non-SourceReaderBase sources but I have not found a better,
> thread-safe
> > > > way. Of course, we could shift the complete calculation into
> user-land
> > > but
> > > > I'm not sure that this is easier.
> > > > For your scenarios:
> > > > - in A, you assume SourceReaderBase. In that case, we could eagerly
> > > report
> > > > the metric as sketched by you. It depends on the definition of "last
> > > > processed record" in FLIP-33, whether this eager reporting is more
> > > correct
> > > > than the lazy reporting that I have proposed. The former case assumes
> > > "last
> > > > processed record" = last fetched record, while the latter case
> assumes
> > > > "last processed record" = "last polled record". For the proposed
> > > solution,
> > > > the user would just need to implement
> > RecordsWithSplitIds#lastFetchTime,
> > > > which typically corresponds to the creation time of the
> > > RecordsWithSplitIds
> > > > instance.
> > > > - B is not assuming SourceReaderBase.
> > > > If it's SourceReaderBase, the same proposed solution works out of the
> > > box:
> > > > SourceOperator intercepts the emitted event time and uses the fetch
> > time
> > > of
> > > > the current batch.
> > > > If it's not SourceReaderBase, the user would need to attach the
> > timestamp
> > > > to the handover protocol if multi-threaded and set the
> > lastFetchTimeGauge
> > > > when a value in the handover protocol is selected (typically a
> batch).
> > > > If it's a single threaded source, the user could directly set the
> > current
> > > > timestamp after fetching the records in a sync fashion.
> > > > The bad case is if the user is fetching individual records (either
> sync
> > > or
> > > > async), then the fetch time would be updated with every record.
> > However,
> > > > I'm assuming that the required system call is dwarfed by involved
> I/O.
> > > >
> > > > [1] https://github.com/apache/flink/pull/15972
> > > >
> > > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <
> ches...@apache.org>
> > > > wrote:
> > > >
> > > > > Re 1: We don't expose the reuse* methods, because the proposed
> > > > > OperatorIOMetricGroup is a separate interface from the existing
> > > > > implementations (which will be renamed and implement the new
> > > interface).
> > > > >
> > > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out
> > counters
> > > > > for tasks ("new" because all we are doing is exposing already
> > existing
> > > > > metrics). We may however change this in the future if we want to
> > report
> > > > > the byte metrics on an operator level, which is primarily
> interesting
> > > > > for async IO or other external connectivity outside of
> sinks/sources.
> > > > >
> > > > > On 13/07/2021 12:38, Becket Qin wrote:
> > > > > > Hi Arvid,
> > > > > >
> > > > > > Thanks for the proposal. I like the idea of exposing concrete
> > metric
> > > > > group
> > > > > > class so that users can access the predefined metrics.
> > > > > >
> > > > > > A few questions are following:
> > > > > >
> > > > > > 1. When exposing the OperatorIOMetrics to the users, we are also
> > > > exposing
> > > > > > the reuseInputMetricsForTask to the users. Should we hide these
> two
> > > > > methods
> > > > > > because users won't have enough information to decide whether the
> > > > records
> > > > > > IO metrics should be reused by the task or not.
> > > > > >
> > > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are
> > adding
> > > > > > numBytesInCounter and numBytesOutCounter. Should these metrics be
> > > > reusing
> > > > > > the task level metrics by default?
> > > > > >
> > > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am not
> > sure
> > > > how
> > > > > > it works with the FetchLag. Typically there are two cases when
> > > > reporting
> > > > > > the fetch lag.
> > > > > >      A. The EventTime is known at the point when the record is
> > > fetched
> > > > in
> > > > > > the SplitFetcher, so the fetch lag can be derived and reported
> > > > > immediately.
> > > > > >      B. The EventTime is known only after the fetched record was
> > > parsed
> > > > > in
> > > > > > the RecordEmitter. In this case, the RecordEmitter needs to get
> the
> > > > fetch
> > > > > > time of that particular record.
> > > > > > I am not sure when users set the LastFetchTime in the above two
> > > cases.
> > > > > Can
> > > > > > you help elaborate on how users should use it?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org>
> > > wrote:
> > > > > >
> > > > > >> Dear devs,
> > > > > >>
> > > > > >> As a continuation and generalization of FLIP-33 (Standardize
> > > Connector
> > > > > >> Metrics) [1], we'd like to discuss how we actually expose the
> > > > > standardized
> > > > > >> operator metrics to users in terms of changes to the API.
> > > > > >>
> > > > > >> Please check out the FLIP [2] and provide feedback.
> > > > > >>
> > > > > >> Best,
> > > > > >>
> > > > > >> Arvid
> > > > > >>
> > > > > >> [1]
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >> [2]
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to