Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-17 Thread Sean Glover
Hi Habib,

With regards to your earlier question about timezones, I've updated the KIP
to remove the LatencyTime abstraction since it is no longer relevant.  I
added a note about epoch time as well.

Thanks,
Sean

On Wed, Jan 15, 2020 at 8:28 AM Habib Nahas  wrote:

> Hi Sean,
>
> Thats great, look forward to it.
>
> Thanks,
> Habib
>
> On Tue, Jan 14, 2020, at 2:55 PM, Sean Glover wrote:
> > Hi Habib,
> >
> > Thank you for the reminder. I'll update the KIP this week and address the
> > feedback from you and Gokul.
> >
> > Regards,
> > Sean
> >
> > On Tue, Jan 14, 2020 at 9:06 AM Habib Nahas  wrote:
> >
> > > Any chance of an update on the KIP? We are interested in seeing this
> move
> > > forward.
> > >
> > > Thanks,
> > > Habib
> > > Sr SDE, AWS
> > >
> > > On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> > > > Thanks Sean. Look forward to the updated KIP.
> > > >
> > > > Regards,
> > > > Habib
> > > >
> > > > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > > > Hi,
> > > > >
> > > > > After my last reply I had a nagging feeling something wasn't right,
> > > and I
> > > > > remembered that epoch time is UTC. This makes the discussion about
> > > > > timezone irrelevant, since we're always using UTC. This makes the
> need
> > > for
> > > > > the LatencyTime interface that I proposed in the design irrelevant
> as
> > > well,
> > > > > since I can no longer think about how that might be useful. I'll
> update
> > > > > the KIP. I'll also review KIP-32 to understand message timestamps
> > > better
> > > > > so I can explain the different types of latency results that could
> be
> > > > > reported with this metric.
> > > > >
> > > > > Regards,
> > > > > Sean
> > > > >
> > > > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover <
> sean.glo...@lightbend.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Habib,
> > > > > >
> > > > > > Thanks for question! If the consumer is in a different timezone
> than
> > > the
> > > > > > timezone used to produce messages to a partition then you can
> use an
> > > > > > implementation of LatencyTime to return the current time of that
> > > timezone.
> > > > > > The current design assumes that messages produced to a partition
> > > must all
> > > > > > be produced from the same timezone. If timezone metadata were
> > > encoded into
> > > > > > each message then it would be possible to automatically
> determine the
> > > > > > source timezone and calculate latency, however the current design
> > > will not
> > > > > > pass individual messages into LatencyTime to retrieve message
> > > metadata.
> > > > > > Instead, the LatencyTime.getWallClockTime method is only called
> once
> > > per
> > > > > > fetch request response per partition and then the metric is
> recorded
> > > once
> > > > > > the latency calculation is complete. This follows the same
> design as
> > > the
> > > > > > current consumer lag metric which calculates offset lag based on
> the
> > > last
> > > > > > message of the fetch request response for a partition. Since the
> > > metric is
> > > > > > just an aggregate (max/mean) over some time window we only need
> to
> > > > > > occasionally calculate latency, which will have negligible
> impact on
> > > the
> > > > > > performance of consumer polling.
> > > > > >
> > > > > > A simple implementation of LatencyTime that returns wall clock
> time
> > > for
> > > > > > the Asia/Singapore timezone for all partitions:
> > > > > >
> > > > > > import java.time.*;
> > > > > >
> > > > > > class SingaporeTime implements LatencyTime {
> > > > > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > > > > Clock clockSingapore = Clock.system(zoneSingapore);
> > > > > >
> > > > > > @Override
> > > > > > public long getWallClockTime(TopicPartition tp) {
> > > > > > return clockSingapore.instant.getEpochSecond();
> > > > > > }
> > > > > >
> > > > > > ...
> > > > > > }
> > > > > >
> > > > > > Regards,
> > > > > > Sean
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas 
> wrote:
> > > > > >
> > > > > >> Hi Sean,
> > > > > >>
> > > > > >> Thanks for the KIP.
> > > > > >>
> > > > > >> As I understand it users are free to set their own timestamp on
> > > > > >> ProducerRecord. What is the recommendation for the proposed
> metric
> > > in a
> > > > > >> scenario where the user sets this timestamp in timezone A and
> > > consumes the
> > > > > >> record in timezone B. Its not clear to me if a custom
> > > implementation of
> > > > > >> LatencyTime will help here.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Habib
> > > > > >>
> > > > > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > > > > >> > Hello again,
> > > > > >> >
> > > > > >> > There has been some interest in this KIP recently. I'm
> bumping the
> > > > > >> thread
> > > > > >> > to encourage feedback on the design.
> > > > > >> >
> > > > > >> > Regards,
> > > > > >> > Sean
> > > > > >> >
> > > > > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <
> > > 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-17 Thread Sean Glover
Hi Gokul,

Thank you for your detailed review.  I've summarized the updates I've made
to the KIP inline below.  Please review the updated KIP when you have time.

On Fri, Dec 20, 2019 at 6:56 AM Gokul Ramanan Subramanian <
gokul24...@gmail.com> wrote:

> Hi Sean.
>
> Thanks for writing this KIP. Sounds like a great addition. Few comments.
>
> 1. Currently, I see that you have proposed partition-level records-latency
> metrics and a global records-latency-max metric across all partitions for a
> given consumer group. Some Kafka users may organize their topics such that
> some topics are more important than others. *Why not have the latency
> metric at the topic level as well?* Although one could imagine having
> metrics aggregation outside of JMX to generate the topic-level metrics, I
> suppose having topic level metrics will allow Kafka users to setup alarms
> at the topic level with greater ease. IMHO, this KIP should address this
> use case. Even if you believe we should not expose topic level metrics, it
> would be nice to see the KIP explain why.
>

I added a topic-level metric.  Something to consider is how to represent
latency at an aggregate (non-partition) level.  For both the client and
topic-level metrics I specify in the KIP that the maximum of all
record-latency-max metrics for partitions assigned to that client should be
used.  Other aggregates could also be used, such as the median, or some set
of percentiles, but one aggregate that does not make sense is to sum
latency across many partitions, because partitions are typically always
consumed in parallel fashion.


>
> 2. Some existing solutions already expose the consumer group lap in time.
> See
>
> https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter
> for
> an example. *The KIP should reference existing solutions and suggest the
> benefits of using the native solution that you propose*.
>

I added Kafka Lag Exporter to the rejected alternatives section.  To
summarize, this project will *estimate* the latency of a partition, and can
only do so when the group partition offsets are committed back to Kafka.

For full disclosure, I am the author of Kafka Lag Exporter (and the linked
blog post), and although I recommend it as being a convenient option to
measure latency I think that a KafkaConsumer metric would be more ideal.


>
> 3. If a message was produced a long time ago, and a new consumer group has
> been created, then the latency metrics are going to be very high in value
> until the consumer group catches up. This is especially true in the context
> of KIP-405 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
> )
> which allows reading very old messages. Therefore, a consumer application
> that relies on reading all messages from the past will report a high
> records-latency for a while. *I think that the KIP should note down the
> caveat that setting SLAs on records-latency makes sense only in steady
> state, and not for bootstrapping new consumer groups.*
>
>
Great point.  I've added your note almost verbatim to the KIP.


> 4. Since the community has used the term consumer-lag so often, *why not
> call the metric consumer-lag-millis which makes the units clear as well*.
> records-latency is a bit confusing at least for me.
>

I agree that "lag" and "latency" are overloaded terms.  Since "latency"
doesn't appear often in Kafka literature, and in my experience, is usually
used to refer to time-based lag, I thought it would be the best name for
the metric.  According to queueing theory it could also be called "wait
time", but this would depend on the timestamp type as well (user-defined,
CreateTime, or LogAppendTime).  I'm curious what others think.


>
> Cheers.
>
> On Wed, Dec 18, 2019 at 3:28 PM Habib Nahas  wrote:
>
> > Thanks Sean. Look forward to the updated KIP.
> >
> > Regards,
> > Habib
> >
> > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > Hi,
> > >
> > > After my last reply I had a nagging feeling something wasn't right,
> and I
> > > remembered that epoch time is UTC. This makes the discussion about
> > > timezone irrelevant, since we're always using UTC. This makes the need
> > for
> > > the LatencyTime interface that I proposed in the design irrelevant as
> > well,
> > > since I can no longer think about how that might be useful. I'll update
> > > the KIP. I'll also review KIP-32 to understand message timestamps
> better
> > > so I can explain the different types of latency results that could be
> > > reported with this metric.
> > >
> > > Regards,
> > > Sean
> > >
> > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover  >
> > > wrote:
> > >
> > > > Hi Habib,
> > > >
> > > > Thanks for question! If the consumer is in a different timezone than
> > the
> > > > timezone used to produce messages to a partition then you can use an
> > > > implementation of LatencyTime to return the current time of that
> > timezone.
> > > > The current design 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-15 Thread Habib Nahas
Hi Sean,

Thats great, look forward to it.

Thanks,
Habib

On Tue, Jan 14, 2020, at 2:55 PM, Sean Glover wrote:
> Hi Habib,
> 
> Thank you for the reminder. I'll update the KIP this week and address the
> feedback from you and Gokul.
> 
> Regards,
> Sean
> 
> On Tue, Jan 14, 2020 at 9:06 AM Habib Nahas  wrote:
> 
> > Any chance of an update on the KIP? We are interested in seeing this move
> > forward.
> >
> > Thanks,
> > Habib
> > Sr SDE, AWS
> >
> > On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> > > Thanks Sean. Look forward to the updated KIP.
> > >
> > > Regards,
> > > Habib
> > >
> > > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > > Hi,
> > > >
> > > > After my last reply I had a nagging feeling something wasn't right,
> > and I
> > > > remembered that epoch time is UTC. This makes the discussion about
> > > > timezone irrelevant, since we're always using UTC. This makes the need
> > for
> > > > the LatencyTime interface that I proposed in the design irrelevant as
> > well,
> > > > since I can no longer think about how that might be useful. I'll update
> > > > the KIP. I'll also review KIP-32 to understand message timestamps
> > better
> > > > so I can explain the different types of latency results that could be
> > > > reported with this metric.
> > > >
> > > > Regards,
> > > > Sean
> > > >
> > > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover  > >
> > > > wrote:
> > > >
> > > > > Hi Habib,
> > > > >
> > > > > Thanks for question! If the consumer is in a different timezone than
> > the
> > > > > timezone used to produce messages to a partition then you can use an
> > > > > implementation of LatencyTime to return the current time of that
> > timezone.
> > > > > The current design assumes that messages produced to a partition
> > must all
> > > > > be produced from the same timezone. If timezone metadata were
> > encoded into
> > > > > each message then it would be possible to automatically determine the
> > > > > source timezone and calculate latency, however the current design
> > will not
> > > > > pass individual messages into LatencyTime to retrieve message
> > metadata.
> > > > > Instead, the LatencyTime.getWallClockTime method is only called once
> > per
> > > > > fetch request response per partition and then the metric is recorded
> > once
> > > > > the latency calculation is complete. This follows the same design as
> > the
> > > > > current consumer lag metric which calculates offset lag based on the
> > last
> > > > > message of the fetch request response for a partition. Since the
> > metric is
> > > > > just an aggregate (max/mean) over some time window we only need to
> > > > > occasionally calculate latency, which will have negligible impact on
> > the
> > > > > performance of consumer polling.
> > > > >
> > > > > A simple implementation of LatencyTime that returns wall clock time
> > for
> > > > > the Asia/Singapore timezone for all partitions:
> > > > >
> > > > > import java.time.*;
> > > > >
> > > > > class SingaporeTime implements LatencyTime {
> > > > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > > > Clock clockSingapore = Clock.system(zoneSingapore);
> > > > >
> > > > > @Override
> > > > > public long getWallClockTime(TopicPartition tp) {
> > > > > return clockSingapore.instant.getEpochSecond();
> > > > > }
> > > > >
> > > > > ...
> > > > > }
> > > > >
> > > > > Regards,
> > > > > Sean
> > > > >
> > > > >
> > > > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > > > >
> > > > >> Hi Sean,
> > > > >>
> > > > >> Thanks for the KIP.
> > > > >>
> > > > >> As I understand it users are free to set their own timestamp on
> > > > >> ProducerRecord. What is the recommendation for the proposed metric
> > in a
> > > > >> scenario where the user sets this timestamp in timezone A and
> > consumes the
> > > > >> record in timezone B. Its not clear to me if a custom
> > implementation of
> > > > >> LatencyTime will help here.
> > > > >>
> > > > >> Thanks,
> > > > >> Habib
> > > > >>
> > > > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > > > >> > Hello again,
> > > > >> >
> > > > >> > There has been some interest in this KIP recently. I'm bumping the
> > > > >> thread
> > > > >> > to encourage feedback on the design.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Sean
> > > > >> >
> > > > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <
> > sean.glo...@lightbend.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > To hopefully spark some discussion I've copied the motivation
> > section
> > > > >> from
> > > > >> > > the KIP:
> > > > >> > >
> > > > >> > > Consumer lag is a useful metric to monitor how many records are
> > > > >> queued to
> > > > >> > > be processed. We can look at individual lag per partition or we
> > may
> > > > >> > > aggregate metrics. For example, we may want to monitor what the
> > > > >> maximum lag
> > > > >> > > of any particular partition in our consumer subscription so we
> > can
> > > > >> identify
> > > > >> > 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-14 Thread Sean Glover
Hi Habib,

Thank you for the reminder.  I'll update the KIP this week and address the
feedback from you and Gokul.

Regards,
Sean

On Tue, Jan 14, 2020 at 9:06 AM Habib Nahas  wrote:

> Any chance of an update on the KIP? We are interested in seeing this move
> forward.
>
> Thanks,
> Habib
> Sr SDE, AWS
>
> On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> > Thanks Sean. Look forward to the updated KIP.
> >
> > Regards,
> > Habib
> >
> > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > Hi,
> > >
> > > After my last reply I had a nagging feeling something wasn't right,
> and I
> > > remembered that epoch time is UTC. This makes the discussion about
> > > timezone irrelevant, since we're always using UTC. This makes the need
> for
> > > the LatencyTime interface that I proposed in the design irrelevant as
> well,
> > > since I can no longer think about how that might be useful. I'll update
> > > the KIP. I'll also review KIP-32 to understand message timestamps
> better
> > > so I can explain the different types of latency results that could be
> > > reported with this metric.
> > >
> > > Regards,
> > > Sean
> > >
> > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover  >
> > > wrote:
> > >
> > > > Hi Habib,
> > > >
> > > > Thanks for question! If the consumer is in a different timezone than
> the
> > > > timezone used to produce messages to a partition then you can use an
> > > > implementation of LatencyTime to return the current time of that
> timezone.
> > > > The current design assumes that messages produced to a partition
> must all
> > > > be produced from the same timezone. If timezone metadata were
> encoded into
> > > > each message then it would be possible to automatically determine the
> > > > source timezone and calculate latency, however the current design
> will not
> > > > pass individual messages into LatencyTime to retrieve message
> metadata.
> > > > Instead, the LatencyTime.getWallClockTime method is only called once
> per
> > > > fetch request response per partition and then the metric is recorded
> once
> > > > the latency calculation is complete. This follows the same design as
> the
> > > > current consumer lag metric which calculates offset lag based on the
> last
> > > > message of the fetch request response for a partition. Since the
> metric is
> > > > just an aggregate (max/mean) over some time window we only need to
> > > > occasionally calculate latency, which will have negligible impact on
> the
> > > > performance of consumer polling.
> > > >
> > > > A simple implementation of LatencyTime that returns wall clock time
> for
> > > > the Asia/Singapore timezone for all partitions:
> > > >
> > > > import java.time.*;
> > > >
> > > > class SingaporeTime implements LatencyTime {
> > > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > > Clock clockSingapore = Clock.system(zoneSingapore);
> > > >
> > > > @Override
> > > > public long getWallClockTime(TopicPartition tp) {
> > > > return clockSingapore.instant.getEpochSecond();
> > > > }
> > > >
> > > > ...
> > > > }
> > > >
> > > > Regards,
> > > > Sean
> > > >
> > > >
> > > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > > >
> > > >> Hi Sean,
> > > >>
> > > >> Thanks for the KIP.
> > > >>
> > > >> As I understand it users are free to set their own timestamp on
> > > >> ProducerRecord. What is the recommendation for the proposed metric
> in a
> > > >> scenario where the user sets this timestamp in timezone A and
> consumes the
> > > >> record in timezone B. Its not clear to me if a custom
> implementation of
> > > >> LatencyTime will help here.
> > > >>
> > > >> Thanks,
> > > >> Habib
> > > >>
> > > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > > >> > Hello again,
> > > >> >
> > > >> > There has been some interest in this KIP recently. I'm bumping the
> > > >> thread
> > > >> > to encourage feedback on the design.
> > > >> >
> > > >> > Regards,
> > > >> > Sean
> > > >> >
> > > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <
> sean.glo...@lightbend.com>
> > > >> > wrote:
> > > >> >
> > > >> > > To hopefully spark some discussion I've copied the motivation
> section
> > > >> from
> > > >> > > the KIP:
> > > >> > >
> > > >> > > Consumer lag is a useful metric to monitor how many records are
> > > >> queued to
> > > >> > > be processed. We can look at individual lag per partition or we
> may
> > > >> > > aggregate metrics. For example, we may want to monitor what the
> > > >> maximum lag
> > > >> > > of any particular partition in our consumer subscription so we
> can
> > > >> identify
> > > >> > > hot partitions, caused by an insufficient producing partitioning
> > > >> strategy.
> > > >> > > We may want to monitor a sum of lag across all partitions so we
> have a
> > > >> > > sense as to our total backlog of messages to consume. Lag in
> offsets
> > > >> is
> > > >> > > useful when you have a good understanding of your messages and
> > > >> processing
> > > >> > > characteristics, but it 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-14 Thread Habib Nahas
Any chance of an update on the KIP? We are interested in seeing this move 
forward.

Thanks,
Habib
Sr SDE, AWS

On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> Thanks Sean. Look forward to the updated KIP.
> 
> Regards,
> Habib
> 
> On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > Hi,
> > 
> > After my last reply I had a nagging feeling something wasn't right, and I
> > remembered that epoch time is UTC. This makes the discussion about
> > timezone irrelevant, since we're always using UTC. This makes the need for
> > the LatencyTime interface that I proposed in the design irrelevant as well,
> > since I can no longer think about how that might be useful. I'll update
> > the KIP. I'll also review KIP-32 to understand message timestamps better
> > so I can explain the different types of latency results that could be
> > reported with this metric.
> > 
> > Regards,
> > Sean
> > 
> > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
> > wrote:
> > 
> > > Hi Habib,
> > >
> > > Thanks for question! If the consumer is in a different timezone than the
> > > timezone used to produce messages to a partition then you can use an
> > > implementation of LatencyTime to return the current time of that timezone.
> > > The current design assumes that messages produced to a partition must all
> > > be produced from the same timezone. If timezone metadata were encoded into
> > > each message then it would be possible to automatically determine the
> > > source timezone and calculate latency, however the current design will not
> > > pass individual messages into LatencyTime to retrieve message metadata.
> > > Instead, the LatencyTime.getWallClockTime method is only called once per
> > > fetch request response per partition and then the metric is recorded once
> > > the latency calculation is complete. This follows the same design as the
> > > current consumer lag metric which calculates offset lag based on the last
> > > message of the fetch request response for a partition. Since the metric is
> > > just an aggregate (max/mean) over some time window we only need to
> > > occasionally calculate latency, which will have negligible impact on the
> > > performance of consumer polling.
> > >
> > > A simple implementation of LatencyTime that returns wall clock time for
> > > the Asia/Singapore timezone for all partitions:
> > >
> > > import java.time.*;
> > >
> > > class SingaporeTime implements LatencyTime {
> > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > Clock clockSingapore = Clock.system(zoneSingapore);
> > >
> > > @Override
> > > public long getWallClockTime(TopicPartition tp) {
> > > return clockSingapore.instant.getEpochSecond();
> > > }
> > >
> > > ...
> > > }
> > >
> > > Regards,
> > > Sean
> > >
> > >
> > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > >
> > >> Hi Sean,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> As I understand it users are free to set their own timestamp on
> > >> ProducerRecord. What is the recommendation for the proposed metric in a
> > >> scenario where the user sets this timestamp in timezone A and consumes 
> > >> the
> > >> record in timezone B. Its not clear to me if a custom implementation of
> > >> LatencyTime will help here.
> > >>
> > >> Thanks,
> > >> Habib
> > >>
> > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > >> > Hello again,
> > >> >
> > >> > There has been some interest in this KIP recently. I'm bumping the
> > >> thread
> > >> > to encourage feedback on the design.
> > >> >
> > >> > Regards,
> > >> > Sean
> > >> >
> > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> > >> > wrote:
> > >> >
> > >> > > To hopefully spark some discussion I've copied the motivation section
> > >> from
> > >> > > the KIP:
> > >> > >
> > >> > > Consumer lag is a useful metric to monitor how many records are
> > >> queued to
> > >> > > be processed. We can look at individual lag per partition or we may
> > >> > > aggregate metrics. For example, we may want to monitor what the
> > >> maximum lag
> > >> > > of any particular partition in our consumer subscription so we can
> > >> identify
> > >> > > hot partitions, caused by an insufficient producing partitioning
> > >> strategy.
> > >> > > We may want to monitor a sum of lag across all partitions so we have 
> > >> > > a
> > >> > > sense as to our total backlog of messages to consume. Lag in offsets
> > >> is
> > >> > > useful when you have a good understanding of your messages and
> > >> processing
> > >> > > characteristics, but it doesn’t tell us how far behind *in time* we
> > >> are.
> > >> > > This is known as wait time in queueing theory, or more informally 
> > >> > > it’s
> > >> > > referred to as latency.
> > >> > >
> > >> > > The latency of a message can be defined as the difference between 
> > >> > > when
> > >> > > that message was first produced to when the message is received by a
> > >> > > consumer. The latency of records in a partition correlates with lag,
> > >> but a
> > >> > > 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-20 Thread Gokul Ramanan Subramanian
Hi Sean.

Thanks for writing this KIP. Sounds like a great addition. Few comments.

1. Currently, I see that you have proposed partition-level records-latency
metrics and a global records-latency-max metric across all partitions for a
given consumer group. Some Kafka users may organize their topics such that
some topics are more important than others. *Why not have the latency
metric at the topic level as well?* Although one could imagine having
metrics aggregation outside of JMX to generate the topic-level metrics, I
suppose having topic level metrics will allow Kafka users to setup alarms
at the topic level with greater ease. IMHO, this KIP should address this
use case. Even if you believe we should not expose topic level metrics, it
would be nice to see the KIP explain why.

2. Some existing solutions already expose the consumer group lap in time.
See
https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter
for
an example. *The KIP should reference existing solutions and suggest the
benefits of using the native solution that you propose*.

3. If a message was produced a long time ago, and a new consumer group has
been created, then the latency metrics are going to be very high in value
until the consumer group catches up. This is especially true in the context
of KIP-405 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage)
which allows reading very old messages. Therefore, a consumer application
that relies on reading all messages from the past will report a high
records-latency for a while. *I think that the KIP should note down the
caveat that setting SLAs on records-latency makes sense only in steady
state, and not for bootstrapping new consumer groups.*

4. Since the community has used the term consumer-lag so often, *why not
call the metric consumer-lag-millis which makes the units clear as well*.
records-latency is a bit confusing at least for me.

Cheers.

On Wed, Dec 18, 2019 at 3:28 PM Habib Nahas  wrote:

> Thanks Sean. Look forward to the updated KIP.
>
> Regards,
> Habib
>
> On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > Hi,
> >
> > After my last reply I had a nagging feeling something wasn't right, and I
> > remembered that epoch time is UTC. This makes the discussion about
> > timezone irrelevant, since we're always using UTC. This makes the need
> for
> > the LatencyTime interface that I proposed in the design irrelevant as
> well,
> > since I can no longer think about how that might be useful. I'll update
> > the KIP. I'll also review KIP-32 to understand message timestamps better
> > so I can explain the different types of latency results that could be
> > reported with this metric.
> >
> > Regards,
> > Sean
> >
> > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
> > wrote:
> >
> > > Hi Habib,
> > >
> > > Thanks for question! If the consumer is in a different timezone than
> the
> > > timezone used to produce messages to a partition then you can use an
> > > implementation of LatencyTime to return the current time of that
> timezone.
> > > The current design assumes that messages produced to a partition must
> all
> > > be produced from the same timezone. If timezone metadata were encoded
> into
> > > each message then it would be possible to automatically determine the
> > > source timezone and calculate latency, however the current design will
> not
> > > pass individual messages into LatencyTime to retrieve message metadata.
> > > Instead, the LatencyTime.getWallClockTime method is only called once
> per
> > > fetch request response per partition and then the metric is recorded
> once
> > > the latency calculation is complete. This follows the same design as
> the
> > > current consumer lag metric which calculates offset lag based on the
> last
> > > message of the fetch request response for a partition. Since the
> metric is
> > > just an aggregate (max/mean) over some time window we only need to
> > > occasionally calculate latency, which will have negligible impact on
> the
> > > performance of consumer polling.
> > >
> > > A simple implementation of LatencyTime that returns wall clock time for
> > > the Asia/Singapore timezone for all partitions:
> > >
> > > import java.time.*;
> > >
> > > class SingaporeTime implements LatencyTime {
> > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > Clock clockSingapore = Clock.system(zoneSingapore);
> > >
> > > @Override
> > > public long getWallClockTime(TopicPartition tp) {
> > > return clockSingapore.instant.getEpochSecond();
> > > }
> > >
> > > ...
> > > }
> > >
> > > Regards,
> > > Sean
> > >
> > >
> > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > >
> > >> Hi Sean,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> As I understand it users are free to set their own timestamp on
> > >> ProducerRecord. What is the recommendation for the proposed metric in
> a
> > >> scenario where the user sets this timestamp in timezone A and
> consumes the
> > 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-18 Thread Habib Nahas
Thanks Sean. Look forward to the updated KIP.

Regards,
Habib

On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> Hi,
> 
> After my last reply I had a nagging feeling something wasn't right, and I
> remembered that epoch time is UTC. This makes the discussion about
> timezone irrelevant, since we're always using UTC. This makes the need for
> the LatencyTime interface that I proposed in the design irrelevant as well,
> since I can no longer think about how that might be useful. I'll update
> the KIP. I'll also review KIP-32 to understand message timestamps better
> so I can explain the different types of latency results that could be
> reported with this metric.
> 
> Regards,
> Sean
> 
> On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
> wrote:
> 
> > Hi Habib,
> >
> > Thanks for question! If the consumer is in a different timezone than the
> > timezone used to produce messages to a partition then you can use an
> > implementation of LatencyTime to return the current time of that timezone.
> > The current design assumes that messages produced to a partition must all
> > be produced from the same timezone. If timezone metadata were encoded into
> > each message then it would be possible to automatically determine the
> > source timezone and calculate latency, however the current design will not
> > pass individual messages into LatencyTime to retrieve message metadata.
> > Instead, the LatencyTime.getWallClockTime method is only called once per
> > fetch request response per partition and then the metric is recorded once
> > the latency calculation is complete. This follows the same design as the
> > current consumer lag metric which calculates offset lag based on the last
> > message of the fetch request response for a partition. Since the metric is
> > just an aggregate (max/mean) over some time window we only need to
> > occasionally calculate latency, which will have negligible impact on the
> > performance of consumer polling.
> >
> > A simple implementation of LatencyTime that returns wall clock time for
> > the Asia/Singapore timezone for all partitions:
> >
> > import java.time.*;
> >
> > class SingaporeTime implements LatencyTime {
> > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > Clock clockSingapore = Clock.system(zoneSingapore);
> >
> > @Override
> > public long getWallClockTime(TopicPartition tp) {
> > return clockSingapore.instant.getEpochSecond();
> > }
> >
> > ...
> > }
> >
> > Regards,
> > Sean
> >
> >
> > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> >
> >> Hi Sean,
> >>
> >> Thanks for the KIP.
> >>
> >> As I understand it users are free to set their own timestamp on
> >> ProducerRecord. What is the recommendation for the proposed metric in a
> >> scenario where the user sets this timestamp in timezone A and consumes the
> >> record in timezone B. Its not clear to me if a custom implementation of
> >> LatencyTime will help here.
> >>
> >> Thanks,
> >> Habib
> >>
> >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> >> > Hello again,
> >> >
> >> > There has been some interest in this KIP recently. I'm bumping the
> >> thread
> >> > to encourage feedback on the design.
> >> >
> >> > Regards,
> >> > Sean
> >> >
> >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> >> > wrote:
> >> >
> >> > > To hopefully spark some discussion I've copied the motivation section
> >> from
> >> > > the KIP:
> >> > >
> >> > > Consumer lag is a useful metric to monitor how many records are
> >> queued to
> >> > > be processed. We can look at individual lag per partition or we may
> >> > > aggregate metrics. For example, we may want to monitor what the
> >> maximum lag
> >> > > of any particular partition in our consumer subscription so we can
> >> identify
> >> > > hot partitions, caused by an insufficient producing partitioning
> >> strategy.
> >> > > We may want to monitor a sum of lag across all partitions so we have a
> >> > > sense as to our total backlog of messages to consume. Lag in offsets
> >> is
> >> > > useful when you have a good understanding of your messages and
> >> processing
> >> > > characteristics, but it doesn’t tell us how far behind *in time* we
> >> are.
> >> > > This is known as wait time in queueing theory, or more informally it’s
> >> > > referred to as latency.
> >> > >
> >> > > The latency of a message can be defined as the difference between when
> >> > > that message was first produced to when the message is received by a
> >> > > consumer. The latency of records in a partition correlates with lag,
> >> but a
> >> > > larger lag doesn’t necessarily mean a larger latency. For example, a
> >> topic
> >> > > consumed by two separate application consumer groups A and B may have
> >> > > similar lag, but different latency per partition. Application A is a
> >> > > consumer which performs CPU intensive business logic on each message
> >> it
> >> > > receives. It’s distributed across many consumer group members to
> >> handle the
> >> > > load quickly enough, but 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-12 Thread Sean Glover
Hi,

After my last reply I had a nagging feeling something wasn't right, and I
remembered that epoch time is UTC.  This makes the discussion about
timezone irrelevant, since we're always using UTC.  This makes the need for
the LatencyTime interface that I proposed in the design irrelevant as well,
since I can no longer think about how that might be useful.  I'll update
the KIP.  I'll also review KIP-32 to understand message timestamps better
so I can explain the different types of latency results that could be
reported with this metric.

Regards,
Sean

On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
wrote:

> Hi Habib,
>
> Thanks for question! If the consumer is in a different timezone than the
> timezone used to produce messages to a partition then you can use an
> implementation of LatencyTime to return the current time of that timezone.
> The current design assumes that messages produced to a partition must all
> be produced from the same timezone.  If timezone metadata were encoded into
> each message then it would be possible to automatically determine the
> source timezone and calculate latency, however the current design will not
> pass individual messages into LatencyTime to retrieve message metadata.
> Instead, the LatencyTime.getWallClockTime method is only called once per
> fetch request response per partition and then the metric is recorded once
> the latency calculation is complete.  This follows the same design as the
> current consumer lag metric which calculates offset lag based on the last
> message of the fetch request response for a partition.  Since the metric is
> just an aggregate (max/mean) over some time window we only need to
> occasionally calculate latency, which will have negligible impact on the
> performance of consumer polling.
>
> A simple implementation of LatencyTime that returns wall clock time for
> the Asia/Singapore timezone for all partitions:
>
> import java.time.*;
>
> class SingaporeTime implements LatencyTime {
>   ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
>   Clock clockSingapore = Clock.system(zoneSingapore);
>
>   @Override
>   public long getWallClockTime(TopicPartition tp) {
> return clockSingapore.instant.getEpochSecond();
>   }
>
>   ...
> }
>
> Regards,
> Sean
>
>
> On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
>
>> Hi Sean,
>>
>> Thanks for the KIP.
>>
>> As I understand it users are free to set their own timestamp on
>> ProducerRecord. What is the recommendation for the proposed metric in a
>> scenario where the user sets this timestamp in timezone A and consumes the
>> record in timezone B. Its not clear to me if a custom implementation of
>> LatencyTime will help here.
>>
>> Thanks,
>> Habib
>>
>> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
>> > Hello again,
>> >
>> > There has been some interest in this KIP recently. I'm bumping the
>> thread
>> > to encourage feedback on the design.
>> >
>> > Regards,
>> > Sean
>> >
>> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
>> > wrote:
>> >
>> > > To hopefully spark some discussion I've copied the motivation section
>> from
>> > > the KIP:
>> > >
>> > > Consumer lag is a useful metric to monitor how many records are
>> queued to
>> > > be processed. We can look at individual lag per partition or we may
>> > > aggregate metrics. For example, we may want to monitor what the
>> maximum lag
>> > > of any particular partition in our consumer subscription so we can
>> identify
>> > > hot partitions, caused by an insufficient producing partitioning
>> strategy.
>> > > We may want to monitor a sum of lag across all partitions so we have a
>> > > sense as to our total backlog of messages to consume. Lag in offsets
>> is
>> > > useful when you have a good understanding of your messages and
>> processing
>> > > characteristics, but it doesn’t tell us how far behind *in time* we
>> are.
>> > > This is known as wait time in queueing theory, or more informally it’s
>> > > referred to as latency.
>> > >
>> > > The latency of a message can be defined as the difference between when
>> > > that message was first produced to when the message is received by a
>> > > consumer. The latency of records in a partition correlates with lag,
>> but a
>> > > larger lag doesn’t necessarily mean a larger latency. For example, a
>> topic
>> > > consumed by two separate application consumer groups A and B may have
>> > > similar lag, but different latency per partition. Application A is a
>> > > consumer which performs CPU intensive business logic on each message
>> it
>> > > receives. It’s distributed across many consumer group members to
>> handle the
>> > > load quickly enough, but since its processing time is slower, it takes
>> > > longer to process each message per partition. Meanwhile, Application
>> B is
>> > > a consumer which performs a simple ETL operation to land streaming
>> data in
>> > > another system, such as HDFS. It may have similar lag to Application
>> A, but
>> > > because it has a faster 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-12 Thread Sean Glover
Hi Habib,

Thanks for question! If the consumer is in a different timezone than the
timezone used to produce messages to a partition then you can use an
implementation of LatencyTime to return the current time of that timezone.
The current design assumes that messages produced to a partition must all
be produced from the same timezone.  If timezone metadata were encoded into
each message then it would be possible to automatically determine the
source timezone and calculate latency, however the current design will not
pass individual messages into LatencyTime to retrieve message metadata.
Instead, the LatencyTime.getWallClockTime method is only called once per
fetch request response per partition and then the metric is recorded once
the latency calculation is complete.  This follows the same design as the
current consumer lag metric which calculates offset lag based on the last
message of the fetch request response for a partition.  Since the metric is
just an aggregate (max/mean) over some time window we only need to
occasionally calculate latency, which will have negligible impact on the
performance of consumer polling.

A simple implementation of LatencyTime that returns wall clock time for the
Asia/Singapore timezone for all partitions:

import java.time.*;

class SingaporeTime implements LatencyTime {
  ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
  Clock clockSingapore = Clock.system(zoneSingapore);

  @Override
  public long getWallClockTime(TopicPartition tp) {
return clockSingapore.instant.getEpochSecond();
  }

  ...
}

Regards,
Sean


On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:

> Hi Sean,
>
> Thanks for the KIP.
>
> As I understand it users are free to set their own timestamp on
> ProducerRecord. What is the recommendation for the proposed metric in a
> scenario where the user sets this timestamp in timezone A and consumes the
> record in timezone B. Its not clear to me if a custom implementation of
> LatencyTime will help here.
>
> Thanks,
> Habib
>
> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > Hello again,
> >
> > There has been some interest in this KIP recently. I'm bumping the thread
> > to encourage feedback on the design.
> >
> > Regards,
> > Sean
> >
> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> > wrote:
> >
> > > To hopefully spark some discussion I've copied the motivation section
> from
> > > the KIP:
> > >
> > > Consumer lag is a useful metric to monitor how many records are queued
> to
> > > be processed. We can look at individual lag per partition or we may
> > > aggregate metrics. For example, we may want to monitor what the
> maximum lag
> > > of any particular partition in our consumer subscription so we can
> identify
> > > hot partitions, caused by an insufficient producing partitioning
> strategy.
> > > We may want to monitor a sum of lag across all partitions so we have a
> > > sense as to our total backlog of messages to consume. Lag in offsets is
> > > useful when you have a good understanding of your messages and
> processing
> > > characteristics, but it doesn’t tell us how far behind *in time* we
> are.
> > > This is known as wait time in queueing theory, or more informally it’s
> > > referred to as latency.
> > >
> > > The latency of a message can be defined as the difference between when
> > > that message was first produced to when the message is received by a
> > > consumer. The latency of records in a partition correlates with lag,
> but a
> > > larger lag doesn’t necessarily mean a larger latency. For example, a
> topic
> > > consumed by two separate application consumer groups A and B may have
> > > similar lag, but different latency per partition. Application A is a
> > > consumer which performs CPU intensive business logic on each message it
> > > receives. It’s distributed across many consumer group members to
> handle the
> > > load quickly enough, but since its processing time is slower, it takes
> > > longer to process each message per partition. Meanwhile, Application B
> is
> > > a consumer which performs a simple ETL operation to land streaming
> data in
> > > another system, such as HDFS. It may have similar lag to Application
> A, but
> > > because it has a faster processing time its latency per partition is
> > > significantly less.
> > >
> > > If the Kafka Consumer reported a latency metric it would be easier to
> > > build Service Level Agreements (SLAs) based on non-functional
> requirements
> > > of the streaming system. For example, the system must never have a
> latency
> > > of greater than 10 minutes. This SLA could be used in monitoring
> alerts or
> > > as input to automatic scaling solutions.
> > >
> > > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover <
> sean.glo...@lightbend.com>
> > > wrote:
> > >
> > >> Hi kafka-dev,
> > >>
> > >> I've created KIP-489 as a proposal for adding latency metrics to the
> > >> Kafka Consumer in a similar way as record-lag metrics are implemented.
> > >>
> > >>
> > >>
> 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-12 Thread Habib Nahas
Hi Sean,

Thanks for the KIP. 

As I understand it users are free to set their own timestamp on ProducerRecord. 
What is the recommendation for the proposed metric in a scenario where the user 
sets this timestamp in timezone A and consumes the record in timezone B. Its 
not clear to me if a custom implementation of LatencyTime will help here.

Thanks,
Habib

On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> Hello again,
> 
> There has been some interest in this KIP recently. I'm bumping the thread
> to encourage feedback on the design.
> 
> Regards,
> Sean
> 
> On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> wrote:
> 
> > To hopefully spark some discussion I've copied the motivation section from
> > the KIP:
> >
> > Consumer lag is a useful metric to monitor how many records are queued to
> > be processed. We can look at individual lag per partition or we may
> > aggregate metrics. For example, we may want to monitor what the maximum lag
> > of any particular partition in our consumer subscription so we can identify
> > hot partitions, caused by an insufficient producing partitioning strategy.
> > We may want to monitor a sum of lag across all partitions so we have a
> > sense as to our total backlog of messages to consume. Lag in offsets is
> > useful when you have a good understanding of your messages and processing
> > characteristics, but it doesn’t tell us how far behind *in time* we are.
> > This is known as wait time in queueing theory, or more informally it’s
> > referred to as latency.
> >
> > The latency of a message can be defined as the difference between when
> > that message was first produced to when the message is received by a
> > consumer. The latency of records in a partition correlates with lag, but a
> > larger lag doesn’t necessarily mean a larger latency. For example, a topic
> > consumed by two separate application consumer groups A and B may have
> > similar lag, but different latency per partition. Application A is a
> > consumer which performs CPU intensive business logic on each message it
> > receives. It’s distributed across many consumer group members to handle the
> > load quickly enough, but since its processing time is slower, it takes
> > longer to process each message per partition. Meanwhile, Application B is
> > a consumer which performs a simple ETL operation to land streaming data in
> > another system, such as HDFS. It may have similar lag to Application A, but
> > because it has a faster processing time its latency per partition is
> > significantly less.
> >
> > If the Kafka Consumer reported a latency metric it would be easier to
> > build Service Level Agreements (SLAs) based on non-functional requirements
> > of the streaming system. For example, the system must never have a latency
> > of greater than 10 minutes. This SLA could be used in monitoring alerts or
> > as input to automatic scaling solutions.
> >
> > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover 
> > wrote:
> >
> >> Hi kafka-dev,
> >>
> >> I've created KIP-489 as a proposal for adding latency metrics to the
> >> Kafka Consumer in a similar way as record-lag metrics are implemented.
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
> >>
> >> Regards,
> >> Sean
> >>
> >> --
> >> Principal Engineer, Lightbend, Inc.
> >>
> >> 
> >>
> >> @seg1o , in/seanaglover
> >> 
> >>
> >
> >
> > --
> > Principal Engineer, Lightbend, Inc.
> >
> > 
> >
> > @seg1o , in/seanaglover
> > 
> >
> 


Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-11 Thread Sean Glover
Hello again,

There has been some interest in this KIP recently.  I'm bumping the thread
to encourage feedback on the design.

Regards,
Sean

On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
wrote:

> To hopefully spark some discussion I've copied the motivation section from
> the KIP:
>
> Consumer lag is a useful metric to monitor how many records are queued to
> be processed.  We can look at individual lag per partition or we may
> aggregate metrics. For example, we may want to monitor what the maximum lag
> of any particular partition in our consumer subscription so we can identify
> hot partitions, caused by an insufficient producing partitioning strategy.
> We may want to monitor a sum of lag across all partitions so we have a
> sense as to our total backlog of messages to consume. Lag in offsets is
> useful when you have a good understanding of your messages and processing
> characteristics, but it doesn’t tell us how far behind *in time* we are.
> This is known as wait time in queueing theory, or more informally it’s
> referred to as latency.
>
> The latency of a message can be defined as the difference between when
> that message was first produced to when the message is received by a
> consumer.  The latency of records in a partition correlates with lag, but a
> larger lag doesn’t necessarily mean a larger latency. For example, a topic
> consumed by two separate application consumer groups A and B may have
> similar lag, but different latency per partition.  Application A is a
> consumer which performs CPU intensive business logic on each message it
> receives. It’s distributed across many consumer group members to handle the
> load quickly enough, but since its processing time is slower, it takes
> longer to process each message per partition.  Meanwhile, Application B is
> a consumer which performs a simple ETL operation to land streaming data in
> another system, such as HDFS. It may have similar lag to Application A, but
> because it has a faster processing time its latency per partition is
> significantly less.
>
> If the Kafka Consumer reported a latency metric it would be easier to
> build Service Level Agreements (SLAs) based on non-functional requirements
> of the streaming system.  For example, the system must never have a latency
> of greater than 10 minutes. This SLA could be used in monitoring alerts or
> as input to automatic scaling solutions.
>
> On Thu, Jul 11, 2019 at 12:36 PM Sean Glover 
> wrote:
>
>> Hi kafka-dev,
>>
>> I've created KIP-489 as a proposal for adding latency metrics to the
>> Kafka Consumer in a similar way as record-lag metrics are implemented.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
>>
>> Regards,
>> Sean
>>
>> --
>> Principal Engineer, Lightbend, Inc.
>>
>> 
>>
>> @seg1o , in/seanaglover
>> 
>>
>
>
> --
> Principal Engineer, Lightbend, Inc.
>
> 
>
> @seg1o , in/seanaglover
> 
>


Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-07-15 Thread Sean Glover
To hopefully spark some discussion I've copied the motivation section from
the KIP:

Consumer lag is a useful metric to monitor how many records are queued to
be processed.  We can look at individual lag per partition or we may
aggregate metrics. For example, we may want to monitor what the maximum lag
of any particular partition in our consumer subscription so we can identify
hot partitions, caused by an insufficient producing partitioning strategy.
We may want to monitor a sum of lag across all partitions so we have a
sense as to our total backlog of messages to consume. Lag in offsets is
useful when you have a good understanding of your messages and processing
characteristics, but it doesn’t tell us how far behind *in time* we are.
This is known as wait time in queueing theory, or more informally it’s
referred to as latency.

The latency of a message can be defined as the difference between when that
message was first produced to when the message is received by a consumer.
The latency of records in a partition correlates with lag, but a larger lag
doesn’t necessarily mean a larger latency. For example, a topic consumed by
two separate application consumer groups A and B may have similar lag, but
different latency per partition.  Application A is a consumer which
performs CPU intensive business logic on each message it receives. It’s
distributed across many consumer group members to handle the load quickly
enough, but since its processing time is slower, it takes longer to process
each message per partition.  Meanwhile, Application B is a consumer which
performs a simple ETL operation to land streaming data in another system,
such as HDFS. It may have similar lag to Application A, but because it has
a faster processing time its latency per partition is significantly less.

If the Kafka Consumer reported a latency metric it would be easier to build
Service Level Agreements (SLAs) based on non-functional requirements of the
streaming system.  For example, the system must never have a latency of
greater than 10 minutes. This SLA could be used in monitoring alerts or as
input to automatic scaling solutions.

On Thu, Jul 11, 2019 at 12:36 PM Sean Glover 
wrote:

> Hi kafka-dev,
>
> I've created KIP-489 as a proposal for adding latency metrics to the Kafka
> Consumer in a similar way as record-lag metrics are implemented.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
>
> Regards,
> Sean
>
> --
> Principal Engineer, Lightbend, Inc.
>
> 
>
> @seg1o , in/seanaglover
> 
>


-- 
Principal Engineer, Lightbend, Inc.



@seg1o , in/seanaglover