Hi Gokul,

Thank you for your detailed review.  I've summarized the updates I've made
to the KIP inline below.  Please review the updated KIP when you have time.

On Fri, Dec 20, 2019 at 6:56 AM Gokul Ramanan Subramanian <
gokul24...@gmail.com> wrote:

> Hi Sean.
>
> Thanks for writing this KIP. Sounds like a great addition. Few comments.
>
> 1. Currently, I see that you have proposed partition-level records-latency
> metrics and a global records-latency-max metric across all partitions for a
> given consumer group. Some Kafka users may organize their topics such that
> some topics are more important than others. *Why not have the latency
> metric at the topic level as well?* Although one could imagine having
> metrics aggregation outside of JMX to generate the topic-level metrics, I
> suppose having topic level metrics will allow Kafka users to setup alarms
> at the topic level with greater ease. IMHO, this KIP should address this
> use case. Even if you believe we should not expose topic level metrics, it
> would be nice to see the KIP explain why.
>

I added a topic-level metric.  Something to consider is how to represent
latency at an aggregate (non-partition) level.  For both the client and
topic-level metrics I specify in the KIP that the maximum of all
record-latency-max metrics for partitions assigned to that client should be
used.  Other aggregates could also be used, such as the median, or some set
of percentiles, but one aggregate that does not make sense is to sum
latency across many partitions, because partitions are typically always
consumed in parallel fashion.


>
> 2. Some existing solutions already expose the consumer group lap in time.
> See
>
> https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter
> for
> an example. *The KIP should reference existing solutions and suggest the
> benefits of using the native solution that you propose*.
>

I added Kafka Lag Exporter to the rejected alternatives section.  To
summarize, this project will *estimate* the latency of a partition, and can
only do so when the group partition offsets are committed back to Kafka.

For full disclosure, I am the author of Kafka Lag Exporter (and the linked
blog post), and although I recommend it as being a convenient option to
measure latency I think that a KafkaConsumer metric would be more ideal.


>
> 3. If a message was produced a long time ago, and a new consumer group has
> been created, then the latency metrics are going to be very high in value
> until the consumer group catches up. This is especially true in the context
> of KIP-405 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
> )
> which allows reading very old messages. Therefore, a consumer application
> that relies on reading all messages from the past will report a high
> records-latency for a while. *I think that the KIP should note down the
> caveat that setting SLAs on records-latency makes sense only in steady
> state, and not for bootstrapping new consumer groups.*
>
>
Great point.  I've added your note almost verbatim to the KIP.


> 4. Since the community has used the term consumer-lag so often, *why not
> call the metric consumer-lag-millis which makes the units clear as well*.
> records-latency is a bit confusing at least for me.
>

I agree that "lag" and "latency" are overloaded terms.  Since "latency"
doesn't appear often in Kafka literature, and in my experience, is usually
used to refer to time-based lag, I thought it would be the best name for
the metric.  According to queueing theory it could also be called "wait
time", but this would depend on the timestamp type as well (user-defined,
CreateTime, or LogAppendTime).  I'm curious what others think.


>
> Cheers.
>
> On Wed, Dec 18, 2019 at 3:28 PM Habib Nahas <ha...@hbnet.io> wrote:
>
> > Thanks Sean. Look forward to the updated KIP.
> >
> > Regards,
> > Habib
> >
> > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > Hi,
> > >
> > > After my last reply I had a nagging feeling something wasn't right,
> and I
> > > remembered that epoch time is UTC. This makes the discussion about
> > > timezone irrelevant, since we're always using UTC. This makes the need
> > for
> > > the LatencyTime interface that I proposed in the design irrelevant as
> > well,
> > > since I can no longer think about how that might be useful. I'll update
> > > the KIP. I'll also review KIP-32 to understand message timestamps
> better
> > > so I can explain the different types of latency results that could be
> > > reported with this metric.
> > >
> > > Regards,
> > > Sean
> > >
> > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover <sean.glo...@lightbend.com
> >
> > > wrote:
> > >
> > > > Hi Habib,
> > > >
> > > > Thanks for question! If the consumer is in a different timezone than
> > the
> > > > timezone used to produce messages to a partition then you can use an
> > > > implementation of LatencyTime to return the current time of that
> > timezone.
> > > > The current design assumes that messages produced to a partition must
> > all
> > > > be produced from the same timezone. If timezone metadata were encoded
> > into
> > > > each message then it would be possible to automatically determine the
> > > > source timezone and calculate latency, however the current design
> will
> > not
> > > > pass individual messages into LatencyTime to retrieve message
> metadata.
> > > > Instead, the LatencyTime.getWallClockTime method is only called once
> > per
> > > > fetch request response per partition and then the metric is recorded
> > once
> > > > the latency calculation is complete. This follows the same design as
> > the
> > > > current consumer lag metric which calculates offset lag based on the
> > last
> > > > message of the fetch request response for a partition. Since the
> > metric is
> > > > just an aggregate (max/mean) over some time window we only need to
> > > > occasionally calculate latency, which will have negligible impact on
> > the
> > > > performance of consumer polling.
> > > >
> > > > A simple implementation of LatencyTime that returns wall clock time
> for
> > > > the Asia/Singapore timezone for all partitions:
> > > >
> > > > import java.time.*;
> > > >
> > > > class SingaporeTime implements LatencyTime {
> > > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > > Clock clockSingapore = Clock.system(zoneSingapore);
> > > >
> > > > @Override
> > > > public long getWallClockTime(TopicPartition tp) {
> > > > return clockSingapore.instant.getEpochSecond();
> > > > }
> > > >
> > > > ...
> > > > }
> > > >
> > > > Regards,
> > > > Sean
> > > >
> > > >
> > > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas <ha...@hbnet.io> wrote:
> > > >
> > > >> Hi Sean,
> > > >>
> > > >> Thanks for the KIP.
> > > >>
> > > >> As I understand it users are free to set their own timestamp on
> > > >> ProducerRecord. What is the recommendation for the proposed metric
> in
> > a
> > > >> scenario where the user sets this timestamp in timezone A and
> > consumes the
> > > >> record in timezone B. Its not clear to me if a custom implementation
> > of
> > > >> LatencyTime will help here.
> > > >>
> > > >> Thanks,
> > > >> Habib
> > > >>
> > > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > > >> > Hello again,
> > > >> >
> > > >> > There has been some interest in this KIP recently. I'm bumping the
> > > >> thread
> > > >> > to encourage feedback on the design.
> > > >> >
> > > >> > Regards,
> > > >> > Sean
> > > >> >
> > > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <
> > sean.glo...@lightbend.com>
> > > >> > wrote:
> > > >> >
> > > >> > > To hopefully spark some discussion I've copied the motivation
> > section
> > > >> from
> > > >> > > the KIP:
> > > >> > >
> > > >> > > Consumer lag is a useful metric to monitor how many records are
> > > >> queued to
> > > >> > > be processed. We can look at individual lag per partition or we
> > may
> > > >> > > aggregate metrics. For example, we may want to monitor what the
> > > >> maximum lag
> > > >> > > of any particular partition in our consumer subscription so we
> can
> > > >> identify
> > > >> > > hot partitions, caused by an insufficient producing partitioning
> > > >> strategy.
> > > >> > > We may want to monitor a sum of lag across all partitions so we
> > have a
> > > >> > > sense as to our total backlog of messages to consume. Lag in
> > offsets
> > > >> is
> > > >> > > useful when you have a good understanding of your messages and
> > > >> processing
> > > >> > > characteristics, but it doesn’t tell us how far behind *in time*
> > we
> > > >> are.
> > > >> > > This is known as wait time in queueing theory, or more
> informally
> > it’s
> > > >> > > referred to as latency.
> > > >> > >
> > > >> > > The latency of a message can be defined as the difference
> between
> > when
> > > >> > > that message was first produced to when the message is received
> > by a
> > > >> > > consumer. The latency of records in a partition correlates with
> > lag,
> > > >> but a
> > > >> > > larger lag doesn’t necessarily mean a larger latency. For
> > example, a
> > > >> topic
> > > >> > > consumed by two separate application consumer groups A and B may
> > have
> > > >> > > similar lag, but different latency per partition. Application A
> > is a
> > > >> > > consumer which performs CPU intensive business logic on each
> > message
> > > >> it
> > > >> > > receives. It’s distributed across many consumer group members to
> > > >> handle the
> > > >> > > load quickly enough, but since its processing time is slower, it
> > takes
> > > >> > > longer to process each message per partition. Meanwhile,
> > Application
> > > >> B is
> > > >> > > a consumer which performs a simple ETL operation to land
> streaming
> > > >> data in
> > > >> > > another system, such as HDFS. It may have similar lag to
> > Application
> > > >> A, but
> > > >> > > because it has a faster processing time its latency per
> partition
> > is
> > > >> > > significantly less.
> > > >> > >
> > > >> > > If the Kafka Consumer reported a latency metric it would be
> > easier to
> > > >> > > build Service Level Agreements (SLAs) based on non-functional
> > > >> requirements
> > > >> > > of the streaming system. For example, the system must never
> have a
> > > >> latency
> > > >> > > of greater than 10 minutes. This SLA could be used in monitoring
> > > >> alerts or
> > > >> > > as input to automatic scaling solutions.
> > > >> > >
> > > >> > > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover <
> > > >> sean.glo...@lightbend.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi kafka-dev,
> > > >> > >>
> > > >> > >> I've created KIP-489 as a proposal for adding latency metrics
> to
> > the
> > > >> > >> Kafka Consumer in a similar way as record-lag metrics are
> > > >> implemented.
> > > >> > >>
> > > >> > >>
> > > >> > >>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
> > > >> > >>
> > > >> > >> Regards,
> > > >> > >> Sean
> > > >> > >>
> > > >> > >> --
> > > >> > >> Principal Engineer, Lightbend, Inc.
> > > >> > >>
> > > >> > >> <http://lightbend.com>
> > > >> > >>
> > > >> > >> @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > > >> > >> <https://www.linkedin.com/in/seanaglover/>
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > Principal Engineer, Lightbend, Inc.
> > > >> > >
> > > >> > > <http://lightbend.com>
> > > >> > >
> > > >> > > @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > > >> > > <https://www.linkedin.com/in/seanaglover/>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> > > --
> > > Sean Glover
> > > Principal Engineer, Alpakka, Lightbend, Inc. <https://lightbend.com>
> > > @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > > <https://www.linkedin.com/in/seanaglover/>
> > >
> >
>
>
> --
> Gokul Ramanan Subramanian
> Senior SDE, Amazon AWS
>

Reply via email to