Hello Bruno,

Thanks for the feedbacks, replied inline.

On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> Thank you for the KIP.
>
> 1) As far as I understand, the StreamsMetrics interface is there for
> user-defined processors. Would it make sense to also add a method to
> the interface to specify a sensor that records skipped records?
>
> Not sure I follow.. if users want to add a specific skipped records
sensor, she can still do that as a "throughput" sensor via "
addThroughputSensor" and then "record" right?

As an after-thought, maybe it's better to rename `throughput` to `rate` in
the public APIs since it is really meant for the latter semantics. I did
not change it just to make less API changes / deprecate fewer functions.
But if we feel it is important we can change it as well.


> 2) What are the semantics of active-task-process and standby-task-process
>
> Ah good catch, I think I made it in the wrong column. Just some
explanations here: Within a thread's looped iterations, it will first try
to process some records from the active tasks, and then see if there are
any standby-tasks that can be processed as well (i.e. just reading from the
restore consumer and apply to the local stores). The ratio metrics are for
indicating 1) what tasks (active or standby) does this thread own so far,
and 2) how much time in percentage does it spend on each of them.

But this metric should really be a task-level one that includes both the
thread-id and task-id, and upon task migrations they will be dynamically
deleted / (re)-created. For each task-id it may be owned by multiple
threads as one active and others standby, and hence the separation of
active / standby seems still necessary.



> 3) How do dropped-late-records and expired-window-record-drop relate
> to each other? I guess the former is for records that fall outside the
> grace period and the latter is for records that are processed after
> the retention period of the window. Is this correct?
>
> Yes, that's correct. The names are indeed a bit confusing since they are
added at different releases historically..

More precisely, the `grace period` is a notion of the operator (hence the
metric is node-level, though it would only be used for DSL operators) while
the `retention` is a notion of the store (hence the metric is store-level).
Usually grace period will be smaller than store retention though.

Processor node is aware of `grace period` and when received a record that
is older than grace deadline, it will be dropped immediately; otherwise it
will still be processed a maybe a new update is "put" into the store. The
store is aware of its `retention period` and then upon a "put" call if it
realized it is older than the retention deadline, that put call would be
ignored and metric is recorded.

We have to separate them here since the window store can be used in both
DSL and PAPI, and for the former case it would likely to be already ignored
at the processor node level due to the grace period which is usually
smaller than retention; but for PAPI there's no grace period and hence the
processor would likely still process and call "put" on the store.


> 4) Is there an actual difference between skipped and dropped records?
> If not, shall we unify the terminology?
>
>
There is. Dropped records are only due to lateness; where as skipped
records can be due to serde errors (and user's error handling indicate
"skip and continue"), timestamp errors, etc.

I've considered maybe a better (more extensible) way would be defining a
single metric name, say skipped-records, but use different tags to indicate
if its skipping reason (errors, windowing semantics, etc). But there's
still a tricky difference: for serde caused skipping for example, they will
be skipped at the very beginning and there's no effects taken at all. For
some others e.g. null-key / value at the reduce operator, it is only
skipped at the middle of the processing, i.e. some effects may have already
been taken in up-stream sub-topologies. And that's why for skipped-records
I've defined it on both task-level and node-level and the aggregate of the
latter may still be smaller than the former, whereas for dropped-records it
is only for node-level.

So how about an even more significant change then: we enlarge the
`dropped-late-records` to `dropped-records` which is node-level only, but
includes reasons form lateness to semantics (like null-key) as well; and
then we have a task-level-only `skipped-records` which only record those
dropped at the very beginning and did not make it at all to the processing
topology. I feel this is a clearer distinguishment but also a bigger change
to users.


> 5) What happens with removed metrics when the user sets the version of
> "built.in.metrics.version" to 2.2-
>
> I think for those redundant ones like ""forward-rate" and "destroy-rate"
we can still remove them with 2.2- as well; for other ones that are removed
/ replaced like thread-level skipped-records we should still maintain them.


Best,
> Bruno
>
> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hello folks,
> >
> > As 2.3 is released now, I'd like to bump up this KIP discussion again for
> > your reviews.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Patrik,
> > >
> > > Since we are rolling out 2.3 and everyone is busy with the release now
> > > this KIP does not have much discussion involved yet and will slip into
> the
> > > next release cadence.
> > >
> > > This KIP itself contains several parts itself: 1. refactoring the
> existing
> > > metrics hierarchy to cleanup some redundancy and also get more
> clarity; 2.
> > > add instance-level metrics like rebalance and state metrics, as well as
> > > other static metrics.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pklei...@gmail.com>
> wrote:
> > >
> > >> Hi Guozhang
> > >> Thanks for the KIP, this looks very helpful.
> > >> Could you please provide more detail on the metrics planned for the
> state?
> > >> We were just considering how to implement this ourselves because we
> need
> > >> to
> > >> track the history of stage changes.
> > >> The idea was to have an accumulated "seconds in state x" metric for
> every
> > >> state.
> > >> The new rebalance metric might solve part of our use case, but it is
> > >> interesting what you have planned for the state metric.
> > >> best regards
> > >> Patrik
> > >>
> > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >>
> > >> > Hello folks,
> > >> >
> > >> > I'd like to propose the following KIP to improve the Kafka Streams
> > >> metrics
> > >> > mechanism to users. This includes 1) a minor change in the public
> > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> built-in
> > >> > metrics hierarchy.
> > >> >
> > >> > Details can be found here:
> > >> >
> > >> >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > >> >
> > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Reply via email to