Hi Habib,

Thanks for question! If the consumer is in a different timezone than the
timezone used to produce messages to a partition then you can use an
implementation of LatencyTime to return the current time of that timezone.
The current design assumes that messages produced to a partition must all
be produced from the same timezone.  If timezone metadata were encoded into
each message then it would be possible to automatically determine the
source timezone and calculate latency, however the current design will not
pass individual messages into LatencyTime to retrieve message metadata.
Instead, the LatencyTime.getWallClockTime method is only called once per
fetch request response per partition and then the metric is recorded once
the latency calculation is complete.  This follows the same design as the
current consumer lag metric which calculates offset lag based on the last
message of the fetch request response for a partition.  Since the metric is
just an aggregate (max/mean) over some time window we only need to
occasionally calculate latency, which will have negligible impact on the
performance of consumer polling.

A simple implementation of LatencyTime that returns wall clock time for the
Asia/Singapore timezone for all partitions:

import java.time.*;

class SingaporeTime implements LatencyTime {
  ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
  Clock clockSingapore = Clock.system(zoneSingapore);

  @Override
  public long getWallClockTime(TopicPartition tp) {
    return clockSingapore.instant.getEpochSecond();
  }

  ...
}

Regards,
Sean


On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas <ha...@hbnet.io> wrote:

> Hi Sean,
>
> Thanks for the KIP.
>
> As I understand it users are free to set their own timestamp on
> ProducerRecord. What is the recommendation for the proposed metric in a
> scenario where the user sets this timestamp in timezone A and consumes the
> record in timezone B. Its not clear to me if a custom implementation of
> LatencyTime will help here.
>
> Thanks,
> Habib
>
> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > Hello again,
> >
> > There has been some interest in this KIP recently. I'm bumping the thread
> > to encourage feedback on the design.
> >
> > Regards,
> > Sean
> >
> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <sean.glo...@lightbend.com>
> > wrote:
> >
> > > To hopefully spark some discussion I've copied the motivation section
> from
> > > the KIP:
> > >
> > > Consumer lag is a useful metric to monitor how many records are queued
> to
> > > be processed. We can look at individual lag per partition or we may
> > > aggregate metrics. For example, we may want to monitor what the
> maximum lag
> > > of any particular partition in our consumer subscription so we can
> identify
> > > hot partitions, caused by an insufficient producing partitioning
> strategy.
> > > We may want to monitor a sum of lag across all partitions so we have a
> > > sense as to our total backlog of messages to consume. Lag in offsets is
> > > useful when you have a good understanding of your messages and
> processing
> > > characteristics, but it doesn’t tell us how far behind *in time* we
> are.
> > > This is known as wait time in queueing theory, or more informally it’s
> > > referred to as latency.
> > >
> > > The latency of a message can be defined as the difference between when
> > > that message was first produced to when the message is received by a
> > > consumer. The latency of records in a partition correlates with lag,
> but a
> > > larger lag doesn’t necessarily mean a larger latency. For example, a
> topic
> > > consumed by two separate application consumer groups A and B may have
> > > similar lag, but different latency per partition. Application A is a
> > > consumer which performs CPU intensive business logic on each message it
> > > receives. It’s distributed across many consumer group members to
> handle the
> > > load quickly enough, but since its processing time is slower, it takes
> > > longer to process each message per partition. Meanwhile, Application B
> is
> > > a consumer which performs a simple ETL operation to land streaming
> data in
> > > another system, such as HDFS. It may have similar lag to Application
> A, but
> > > because it has a faster processing time its latency per partition is
> > > significantly less.
> > >
> > > If the Kafka Consumer reported a latency metric it would be easier to
> > > build Service Level Agreements (SLAs) based on non-functional
> requirements
> > > of the streaming system. For example, the system must never have a
> latency
> > > of greater than 10 minutes. This SLA could be used in monitoring
> alerts or
> > > as input to automatic scaling solutions.
> > >
> > > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover <
> sean.glo...@lightbend.com>
> > > wrote:
> > >
> > >> Hi kafka-dev,
> > >>
> > >> I've created KIP-489 as a proposal for adding latency metrics to the
> > >> Kafka Consumer in a similar way as record-lag metrics are implemented.
> > >>
> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
> > >>
> > >> Regards,
> > >> Sean
> > >>
> > >> --
> > >> Principal Engineer, Lightbend, Inc.
> > >>
> > >> <http://lightbend.com>
> > >>
> > >> @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > >> <https://www.linkedin.com/in/seanaglover/>
> > >>
> > >
> > >
> > > --
> > > Principal Engineer, Lightbend, Inc.
> > >
> > > <http://lightbend.com>
> > >
> > > @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > > <https://www.linkedin.com/in/seanaglover/>
> > >
> >
>

Reply via email to