Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-15 Thread Habib Nahas
Hi Sean,

Thats great, look forward to it.

Thanks,
Habib

On Tue, Jan 14, 2020, at 2:55 PM, Sean Glover wrote:
> Hi Habib,
> 
> Thank you for the reminder. I'll update the KIP this week and address the
> feedback from you and Gokul.
> 
> Regards,
> Sean
> 
> On Tue, Jan 14, 2020 at 9:06 AM Habib Nahas  wrote:
> 
> > Any chance of an update on the KIP? We are interested in seeing this move
> > forward.
> >
> > Thanks,
> > Habib
> > Sr SDE, AWS
> >
> > On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> > > Thanks Sean. Look forward to the updated KIP.
> > >
> > > Regards,
> > > Habib
> > >
> > > On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > > > Hi,
> > > >
> > > > After my last reply I had a nagging feeling something wasn't right,
> > and I
> > > > remembered that epoch time is UTC. This makes the discussion about
> > > > timezone irrelevant, since we're always using UTC. This makes the need
> > for
> > > > the LatencyTime interface that I proposed in the design irrelevant as
> > well,
> > > > since I can no longer think about how that might be useful. I'll update
> > > > the KIP. I'll also review KIP-32 to understand message timestamps
> > better
> > > > so I can explain the different types of latency results that could be
> > > > reported with this metric.
> > > >
> > > > Regards,
> > > > Sean
> > > >
> > > > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover  > >
> > > > wrote:
> > > >
> > > > > Hi Habib,
> > > > >
> > > > > Thanks for question! If the consumer is in a different timezone than
> > the
> > > > > timezone used to produce messages to a partition then you can use an
> > > > > implementation of LatencyTime to return the current time of that
> > timezone.
> > > > > The current design assumes that messages produced to a partition
> > must all
> > > > > be produced from the same timezone. If timezone metadata were
> > encoded into
> > > > > each message then it would be possible to automatically determine the
> > > > > source timezone and calculate latency, however the current design
> > will not
> > > > > pass individual messages into LatencyTime to retrieve message
> > metadata.
> > > > > Instead, the LatencyTime.getWallClockTime method is only called once
> > per
> > > > > fetch request response per partition and then the metric is recorded
> > once
> > > > > the latency calculation is complete. This follows the same design as
> > the
> > > > > current consumer lag metric which calculates offset lag based on the
> > last
> > > > > message of the fetch request response for a partition. Since the
> > metric is
> > > > > just an aggregate (max/mean) over some time window we only need to
> > > > > occasionally calculate latency, which will have negligible impact on
> > the
> > > > > performance of consumer polling.
> > > > >
> > > > > A simple implementation of LatencyTime that returns wall clock time
> > for
> > > > > the Asia/Singapore timezone for all partitions:
> > > > >
> > > > > import java.time.*;
> > > > >
> > > > > class SingaporeTime implements LatencyTime {
> > > > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > > > Clock clockSingapore = Clock.system(zoneSingapore);
> > > > >
> > > > > @Override
> > > > > public long getWallClockTime(TopicPartition tp) {
> > > > > return clockSingapore.instant.getEpochSecond();
> > > > > }
> > > > >
> > > > > ...
> > > > > }
> > > > >
> > > > > Regards,
> > > > > Sean
> > > > >
> > > > >
> > > > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > > > >
> > > > >> Hi Sean,
> > > > >>
> > > > >> Thanks for the KIP.
> > > > >>
> > > > >> As I understand it users are free to set their own timestamp on
> > > > >> ProducerRecord. What is the recommendation for the proposed metric
> > in a
> > > > >> scenario where the user s

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2020-01-14 Thread Habib Nahas
Any chance of an update on the KIP? We are interested in seeing this move 
forward.

Thanks,
Habib
Sr SDE, AWS

On Wed, Dec 18, 2019, at 3:27 PM, Habib Nahas wrote:
> Thanks Sean. Look forward to the updated KIP.
> 
> Regards,
> Habib
> 
> On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > Hi,
> > 
> > After my last reply I had a nagging feeling something wasn't right, and I
> > remembered that epoch time is UTC. This makes the discussion about
> > timezone irrelevant, since we're always using UTC. This makes the need for
> > the LatencyTime interface that I proposed in the design irrelevant as well,
> > since I can no longer think about how that might be useful. I'll update
> > the KIP. I'll also review KIP-32 to understand message timestamps better
> > so I can explain the different types of latency results that could be
> > reported with this metric.
> > 
> > Regards,
> > Sean
> > 
> > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
> > wrote:
> > 
> > > Hi Habib,
> > >
> > > Thanks for question! If the consumer is in a different timezone than the
> > > timezone used to produce messages to a partition then you can use an
> > > implementation of LatencyTime to return the current time of that timezone.
> > > The current design assumes that messages produced to a partition must all
> > > be produced from the same timezone. If timezone metadata were encoded into
> > > each message then it would be possible to automatically determine the
> > > source timezone and calculate latency, however the current design will not
> > > pass individual messages into LatencyTime to retrieve message metadata.
> > > Instead, the LatencyTime.getWallClockTime method is only called once per
> > > fetch request response per partition and then the metric is recorded once
> > > the latency calculation is complete. This follows the same design as the
> > > current consumer lag metric which calculates offset lag based on the last
> > > message of the fetch request response for a partition. Since the metric is
> > > just an aggregate (max/mean) over some time window we only need to
> > > occasionally calculate latency, which will have negligible impact on the
> > > performance of consumer polling.
> > >
> > > A simple implementation of LatencyTime that returns wall clock time for
> > > the Asia/Singapore timezone for all partitions:
> > >
> > > import java.time.*;
> > >
> > > class SingaporeTime implements LatencyTime {
> > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > Clock clockSingapore = Clock.system(zoneSingapore);
> > >
> > > @Override
> > > public long getWallClockTime(TopicPartition tp) {
> > > return clockSingapore.instant.getEpochSecond();
> > > }
> > >
> > > ...
> > > }
> > >
> > > Regards,
> > > Sean
> > >
> > >
> > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> > >
> > >> Hi Sean,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> As I understand it users are free to set their own timestamp on
> > >> ProducerRecord. What is the recommendation for the proposed metric in a
> > >> scenario where the user sets this timestamp in timezone A and consumes 
> > >> the
> > >> record in timezone B. Its not clear to me if a custom implementation of
> > >> LatencyTime will help here.
> > >>
> > >> Thanks,
> > >> Habib
> > >>
> > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > >> > Hello again,
> > >> >
> > >> > There has been some interest in this KIP recently. I'm bumping the
> > >> thread
> > >> > to encourage feedback on the design.
> > >> >
> > >> > Regards,
> > >> > Sean
> > >> >
> > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> > >> > wrote:
> > >> >
> > >> > > To hopefully spark some discussion I've copied the motivation section
> > >> from
> > >> > > the KIP:
> > >> > >
> > >> > > Consumer lag is a useful metric to monitor how many records are
> > >> queued to
> > >> > > be processed. We can look at individual lag per partition or we may
> > >> > > aggregate metrics. For example, 

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-18 Thread Habib Nahas
Thanks Sean. Look forward to the updated KIP.

Regards,
Habib

On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> Hi,
> 
> After my last reply I had a nagging feeling something wasn't right, and I
> remembered that epoch time is UTC. This makes the discussion about
> timezone irrelevant, since we're always using UTC. This makes the need for
> the LatencyTime interface that I proposed in the design irrelevant as well,
> since I can no longer think about how that might be useful. I'll update
> the KIP. I'll also review KIP-32 to understand message timestamps better
> so I can explain the different types of latency results that could be
> reported with this metric.
> 
> Regards,
> Sean
> 
> On Thu, Dec 12, 2019 at 6:25 PM Sean Glover 
> wrote:
> 
> > Hi Habib,
> >
> > Thanks for question! If the consumer is in a different timezone than the
> > timezone used to produce messages to a partition then you can use an
> > implementation of LatencyTime to return the current time of that timezone.
> > The current design assumes that messages produced to a partition must all
> > be produced from the same timezone. If timezone metadata were encoded into
> > each message then it would be possible to automatically determine the
> > source timezone and calculate latency, however the current design will not
> > pass individual messages into LatencyTime to retrieve message metadata.
> > Instead, the LatencyTime.getWallClockTime method is only called once per
> > fetch request response per partition and then the metric is recorded once
> > the latency calculation is complete. This follows the same design as the
> > current consumer lag metric which calculates offset lag based on the last
> > message of the fetch request response for a partition. Since the metric is
> > just an aggregate (max/mean) over some time window we only need to
> > occasionally calculate latency, which will have negligible impact on the
> > performance of consumer polling.
> >
> > A simple implementation of LatencyTime that returns wall clock time for
> > the Asia/Singapore timezone for all partitions:
> >
> > import java.time.*;
> >
> > class SingaporeTime implements LatencyTime {
> > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > Clock clockSingapore = Clock.system(zoneSingapore);
> >
> > @Override
> > public long getWallClockTime(TopicPartition tp) {
> > return clockSingapore.instant.getEpochSecond();
> > }
> >
> > ...
> > }
> >
> > Regards,
> > Sean
> >
> >
> > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas  wrote:
> >
> >> Hi Sean,
> >>
> >> Thanks for the KIP.
> >>
> >> As I understand it users are free to set their own timestamp on
> >> ProducerRecord. What is the recommendation for the proposed metric in a
> >> scenario where the user sets this timestamp in timezone A and consumes the
> >> record in timezone B. Its not clear to me if a custom implementation of
> >> LatencyTime will help here.
> >>
> >> Thanks,
> >> Habib
> >>
> >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> >> > Hello again,
> >> >
> >> > There has been some interest in this KIP recently. I'm bumping the
> >> thread
> >> > to encourage feedback on the design.
> >> >
> >> > Regards,
> >> > Sean
> >> >
> >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> >> > wrote:
> >> >
> >> > > To hopefully spark some discussion I've copied the motivation section
> >> from
> >> > > the KIP:
> >> > >
> >> > > Consumer lag is a useful metric to monitor how many records are
> >> queued to
> >> > > be processed. We can look at individual lag per partition or we may
> >> > > aggregate metrics. For example, we may want to monitor what the
> >> maximum lag
> >> > > of any particular partition in our consumer subscription so we can
> >> identify
> >> > > hot partitions, caused by an insufficient producing partitioning
> >> strategy.
> >> > > We may want to monitor a sum of lag across all partitions so we have a
> >> > > sense as to our total backlog of messages to consume. Lag in offsets
> >> is
> >> > > useful when you have a good understanding of your messages and
> >> processing
> >> > > characteristics, but it doesn’t tell us how far behind *in time* we
> &

Re: [DISCUSS] KIP-489 Kafka Consumer Record Latency Metric

2019-12-12 Thread Habib Nahas
Hi Sean,

Thanks for the KIP. 

As I understand it users are free to set their own timestamp on ProducerRecord. 
What is the recommendation for the proposed metric in a scenario where the user 
sets this timestamp in timezone A and consumes the record in timezone B. Its 
not clear to me if a custom implementation of LatencyTime will help here.

Thanks,
Habib

On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> Hello again,
> 
> There has been some interest in this KIP recently. I'm bumping the thread
> to encourage feedback on the design.
> 
> Regards,
> Sean
> 
> On Mon, Jul 15, 2019 at 9:01 AM Sean Glover 
> wrote:
> 
> > To hopefully spark some discussion I've copied the motivation section from
> > the KIP:
> >
> > Consumer lag is a useful metric to monitor how many records are queued to
> > be processed. We can look at individual lag per partition or we may
> > aggregate metrics. For example, we may want to monitor what the maximum lag
> > of any particular partition in our consumer subscription so we can identify
> > hot partitions, caused by an insufficient producing partitioning strategy.
> > We may want to monitor a sum of lag across all partitions so we have a
> > sense as to our total backlog of messages to consume. Lag in offsets is
> > useful when you have a good understanding of your messages and processing
> > characteristics, but it doesn’t tell us how far behind *in time* we are.
> > This is known as wait time in queueing theory, or more informally it’s
> > referred to as latency.
> >
> > The latency of a message can be defined as the difference between when
> > that message was first produced to when the message is received by a
> > consumer. The latency of records in a partition correlates with lag, but a
> > larger lag doesn’t necessarily mean a larger latency. For example, a topic
> > consumed by two separate application consumer groups A and B may have
> > similar lag, but different latency per partition. Application A is a
> > consumer which performs CPU intensive business logic on each message it
> > receives. It’s distributed across many consumer group members to handle the
> > load quickly enough, but since its processing time is slower, it takes
> > longer to process each message per partition. Meanwhile, Application B is
> > a consumer which performs a simple ETL operation to land streaming data in
> > another system, such as HDFS. It may have similar lag to Application A, but
> > because it has a faster processing time its latency per partition is
> > significantly less.
> >
> > If the Kafka Consumer reported a latency metric it would be easier to
> > build Service Level Agreements (SLAs) based on non-functional requirements
> > of the streaming system. For example, the system must never have a latency
> > of greater than 10 minutes. This SLA could be used in monitoring alerts or
> > as input to automatic scaling solutions.
> >
> > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover 
> > wrote:
> >
> >> Hi kafka-dev,
> >>
> >> I've created KIP-489 as a proposal for adding latency metrics to the
> >> Kafka Consumer in a similar way as record-lag metrics are implemented.
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
> >>
> >> Regards,
> >> Sean
> >>
> >> --
> >> Principal Engineer, Lightbend, Inc.
> >>
> >> 
> >>
> >> @seg1o , in/seanaglover
> >> 
> >>
> >
> >
> > --
> > Principal Engineer, Lightbend, Inc.
> >
> > 
> >
> > @seg1o , in/seanaglover
> > 
> >
> 


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-07-26 Thread Habib Nahas
Thanks for the reply, Harsha and Satish. I have a follow up, I'm fairly new to 
this space so forgive me if there is an obvious answer.

AIUI the current design proposal requires explicit support for integrating new 
remote filesystems, such as GCP Cloud Storage or S3. If Kafka offers a solution 
that allows an archive data dir based on policy, I can mount a FUSE filesystem 
over, say over GCP or S3 and archive my older log segments to that mount. My 
team can certainly take advantage of such an implementation over say 
https://github.com/s3fs-fuse/s3fs-fuse.

There are also a large number of other FUSE filesystems already implemented, 
which Kafka can take advantage of without additional work.

Of course this could be a separate KIP as well, please let me know if that is 
the case.

FUSE(with a list of implemented destinations)- 
https://en.wikipedia.org/wiki/Filesystem_in_Userspace 

Thanks,
Habib

On Thu, Jul 25, 2019, at 8:40 AM, Satish Duggana wrote:
> >>Under the proposed definition of RemoteTier, would it be possible to
> have an implementation that transfers older log segments to a slower storage
> tier, but one that is still local?
> Examples of slower local(ie mounted locally) tiers being HDDs vs SSDs,
> or NFS volumes.
> 
> No, it does not allow moving logs to different mount points in the
> same broker. In the current design, follower replicas may not have the
> log segments locally available but it will fetch the remote log
> indexes for older segments. Follower replicas will not replicate the
> data beyond local log retention. So, the data that is out of local log
> retention period/size will be fetched from the remote tier.
> 
> Thanks,
> Satish.
> 
> On Thu, Jul 25, 2019 at 6:01 AM Habib Nahas  wrote:
> >
> > Hi,
> >
> > Under the proposed definition of RemoteTier, would it be possible to have 
> > an implementation that transfers older log segments to a slower storage 
> > tier, but one that is still local?
> > Examples of slower local(ie mounted locally) tiers being HDDs vs SSDs, or 
> > NFS volumes.
> >
> > Let me know if I"m missing an existing solution for this usecase.
> > Thanks,
> >
> > Habib
> >
> >
> > On 2019/04/09 05:04:17, Harsha  wrote:
> > > Thanks, Ron. Updating the KIP. will add answers here as well>
> > >
> > > 1) If the cold storage technology can be cross-region, is there a>
> > > possibility for a disaster recovery Kafka cluster to share the messages 
> > > in>
> > > cold storage? My guess is the answer is no, and messages replicated to 
> > > the>
> > > D/R cluster have to be migrated to cold storage from there independently.>
> > > (The same cross-region cold storage medium could be used, but every 
> > > message>
> > > would appear there twice).>
> > >
> > > If I understand the question correctly, what you are saying is Kafka A 
> > > cluster (active) shipping logs to remote storage which cross-region 
> > > replication and another Kafka Cluster B (Passive) will it be able to use 
> > > the remote storage copied logs directly.>
> >
> >
> >
> >
> > > For the initial version my answer is No. We can handle this in subsequent 
> > > changes after this one.>
> > >
> > > 2) Can/should external (non-Kafka) tools have direct access to the 
> > > messages>
> > > in cold storage. I think this might have been addressed when someone 
> > > asked>
> > > about ACLs, and I believe the answer is "no" -- if some external tool 
> > > needs>
> > > to operate on that data then that external tool should read that data by>
> > > acting as a Kafka consumer. Again, just asking to get the answer clearly>
> > > documented in case it is unclear.>
> > >
> > > The answer is No. All tools/clients must go through broker APIs to access 
> > > any data (local or remote). >
> > > Only Kafka broker user will have access to remote storage logs and 
> > > Security/ACLs will work the way it does today.>
> > > Tools/Clients going directly to the remote storage might help in terms of 
> > > efficiency but this requires Protocol changes and some way of syncing 
> > > ACLs in Kafka to the Remote storage. >
> >
> >
> >
> >
> > >
> > >
> > > Thanks,>
> > > Harsha>
> > >
> > > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote:>
> > > > Hi Harsha. A couple of questions. I think I know the answers, but it>
> > > > would be good to

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-07-25 Thread Habib Nahas
Hi,

Under the proposed definition of RemoteTier, would it be possible to have an 
implementation that transfers older log segments to a slower storage tier, but 
one that is still local?
Examples of slower local(ie mounted locally) tiers being HDDs vs SSDs, or NFS 
volumes. 

Let me know if I"m missing an existing solution for this usecase.
Thanks,

Habib


On 2019/04/09 05:04:17, Harsha  wrote: 
> Thanks, Ron. Updating the KIP. will add answers here as well> 
> 
> 1) If the cold storage technology can be cross-region, is there a> 
> possibility for a disaster recovery Kafka cluster to share the messages in> 
> cold storage? My guess is the answer is no, and messages replicated to the> 
> D/R cluster have to be migrated to cold storage from there independently.> 
> (The same cross-region cold storage medium could be used, but every message> 
> would appear there twice).> 
> 
> If I understand the question correctly, what you are saying is Kafka A 
> cluster (active) shipping logs to remote storage which cross-region 
> replication and another Kafka Cluster B (Passive) will it be able to use the 
> remote storage copied logs directly.>




> For the initial version my answer is No. We can handle this in subsequent 
> changes after this one.> 
> 
> 2) Can/should external (non-Kafka) tools have direct access to the messages> 
> in cold storage. I think this might have been addressed when someone asked> 
> about ACLs, and I believe the answer is "no" -- if some external tool needs> 
> to operate on that data then that external tool should read that data by> 
> acting as a Kafka consumer. Again, just asking to get the answer clearly> 
> documented in case it is unclear.> 
> 
> The answer is No. All tools/clients must go through broker APIs to access any 
> data (local or remote). > 
> Only Kafka broker user will have access to remote storage logs and 
> Security/ACLs will work the way it does today.> 
> Tools/Clients going directly to the remote storage might help in terms of 
> efficiency but this requires Protocol changes and some way of syncing ACLs in 
> Kafka to the Remote storage. >




> 
> 
> Thanks,> 
> Harsha> 
> 
> On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote:> 
> > Hi Harsha. A couple of questions. I think I know the answers, but it> 
> > would be good to see them explicitly documented.> 
> > > 
> > 1) If the cold storage technology can be cross-region, is there a> 
> > possibility for a disaster recovery Kafka cluster to share the messages in> 
> > cold storage? My guess is the answer is no, and messages replicated to the> 
> > D/R cluster have to be migrated to cold storage from there independently.> 
> > (The same cross-region cold storage medium could be used, but every 
> > message> 
> > would appear there twice).> 
> > > 
> > 2) Can/should external (non-Kafka) tools have direct access to the 
> > messages> 
> > in cold storage. I think this might have been addressed when someone asked> 
> > about ACLs, and I believe the answer is "no" -- if some external tool 
> > needs> 
> > to operate on that data then that external tool should read that data by> 
> > acting as a Kafka consumer. Again, just asking to get the answer clearly> 
> > documented in case it is unclear.> 
> > > 
> > Ron> 
> > > 
> > > 
> > On Thu, Apr 4, 2019 at 12:53 AM Harsha  wrote:> 
> > > 
> > > Hi Viktor,> 
> > >> 
> > >> 
> > > "Now, will the consumer be able to consume a remote segment if:> 
> > > - the remote segment is stored in the remote storage, BUT> 
> > > - the leader broker failed right after this AND> 
> > > - the follower which is to become a leader didn't scan yet for a new> 
> > > segment?"> 
> > >> 
> > > If I understand correctly, after a local log segment copied to remote 
> > > and> 
> > > leader is failed to write the index files and leadership changed to a> 
> > > follower. In this case we consider the log segment copy failed and newly> 
> > > elected leader will start copying the data from last the known offset in> 
> > > the remote to copy. Consumers who are looking for the offset which might> 
> > > be in the failed copy log segment will continue to be read the data from> 
> > > local disk since the local log segment will only be deleted once a> 
> > > successful copy of the log segment.> 
> > >> 
> > > "As a follow-up question, what are your experiences, does a failover in 
> > > a> 
> > > broker causes bigger than usual churn in the consumers? (I'm thinking 
> > > about> 
> > > the time required to rebuild remote index files.)"> 
> > >> 
> > > Rebuild remote index files will only happen in case of remote storage> 
> > > missing all the copied index files. Fail-over will not trigger this> 
> > > rebuild.> 
> > >> 
> > >> 
> > > Hi Ryan,> 
> > >> 
> > > "Harsha, can you comment on this alternative approach: instead of 
> > > fetching> 
> > > directly from remote storage via a new API, implement something like> 
> > > paging, where segments are paged-in and out of cold storage based on 
> > >