[VOTE] KIP-293: Add new metrics for consumer/replication fetch requests

2018-06-25 Thread Adam Kotwasinski
Hello,

In the absence of additional feedback on this KIP I'd like to start a vote.

To summarize, the KIP simply proposes to add a consumer metric to track
the number of fetch requests made by (real) client consumers and not
other replicating brokers.

KIP link: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537
PR: https://github.com/apache/kafka/pull/4936
Discussion thread:
https://www.mail-archive.com/dev@kafka.apache.org/msg87465.html

Yours faithfully,
Adam Kotwasinski


Re: [DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-18 Thread Adam Kotwasinski
Hey Dong,

Re a) - not really, it's only the corner case when you actually have
consumers but no one is producing (I had that in my environments, when
producer services were shut down weeks before consumers). However, it
is absolutely possible to argue that a simple gauge answering a
question `when did last consumer fetch happen?` would suffice as an
answer to a question 'is anyone trying to consume right now' - the
rest is just extras (though the growth of that histogram would give
some hint towards the number of potential consumers).

Re b) - I actually meant 'delete a topic only when there are no
production requests happening (for example: all partition's start has
been == end and has been constant for T time) and there are no fetch
requests (the new metric value stays constant for T time as well).
Regarding deleting the topic in current state we'd leave it to
implementer (I had picked the above conditions).

To give you some background, we were deploying a cluster used as a
messaging bus, with dynamically created unique topics. Unique as in
relevant only during the lifespan of the business-level service.
Over time we had reached a situation when we had roughly 150k
partitions in a 10-node cluster, with large part of them simply being
no longer needed, but still being present in ZK (and for example
prolonging leader election after full cluster restart).
So basically, I developed a trivial monitor that's keeping in memory
log's start, log's end & when was the read attempt (and it uses
TotalFetchRequests). Unfortunately, the thing works only only for
replication factor = 1, because of obvious reasons.

Following the idea of TTL, I think that could be configurable:
a) delete only when there are no new messages for T time (which is
more or less equal to log rotation, just with additional removal of ZK
data) - however it could be considered surprising to have a consumer
get its offset and receive N, and after some time receive 0, due to
removal and automatic re-creation happening in the meantime.
b) or the above AND no consumer requests for same T time - where that
would be actually pretty similar to how RabbitMQ behaves (
https://www.rabbitmq.com/ttl.html#queue-ttl )

[ I think on some level I am straying into discussing how to implement
topic TTL ]

Best regards,
Adam

2018-05-18 20:46 GMT+01:00 Dong Lin <lindon...@gmail.com>:
> Hey Adam,
>
> Regarding a), if the goal is to monitor the load in the system, then the
> existing FetchRequest rate (including those sent by the broker) would
> probably be sufficient. Anyway, this new metric for monitoring system load
> seems useful with little overhead in code or operation. Just trying to see
> whether there is strong reason for it.
>
> Regarding b), it is an interesting use-case. Maybe it will be more useful
> to understand why it is safe to "delete" the topic if there is no active
> consumer? Usually if we add something for an otherwise infeasible use-case,
> we want to make sure it does address the use-case. Intuitively, it seems
> that the whether or not the topic can be deleted depends on 1) whether
> there will be new data produced to this topic; and 2) whether the existing
> data in the topic will be needed in the future. These two reasons are not
> related to whether there is consumer consuming from the topic.
>
>
> Thanks,
> Dong
>
>
> On Fri, May 18, 2018 at 12:15 PM, Adam Kotwasinski <
> adam.kotwasin...@gmail.com> wrote:
>
>> Hey Dong,
>>
>> I have observed two usecases:
>> a) monitoring (if infrastructure does not have any control over
>> clients) - if I only provide Kafka brokers, but I am not aware of
>> clients, I cannot say if anyone would even consume the data - this can
>> be somehow compared to listing existing consumers/connections in
>> RabbitMQ queues (theoretically, someone use some kind of network
>> interface monitoring for an estimate)
>> b) cleanup of unused topics - when Kafka is used as a messaging hub,
>> with dynamically created topics, we can reach a very large number of
>> them. Removing the ones that still have active consumers causes odd
>> side effects (e.g. the consumers are again reading from offset 0) and
>> bugs ( https://issues.apache.org/jira/browse/KAFKA-6438 ).
>>
>> [fwiw, b) also can be solved by having topic TTL (in a fashion similar
>> to e.g. RabbitMQ) - I will be submitting a relevant KIP soon]
>>
>> Yours sincerely,
>> Adam Kotwasinski
>>
>> 2018-05-18 19:49 GMT+01:00 Dong Lin <lindon...@gmail.com>:
>> > Hey Adam,
>> >
>> > Got it. Thanks for the explanation. Could you also explain the use-case
>> for
>> > knowing whether a topic with no new messages has active consumers? Just
>> > want to make sure that the use-case 

Re: [DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-18 Thread Adam Kotwasinski
Hey Dong,

I have observed two usecases:
a) monitoring (if infrastructure does not have any control over
clients) - if I only provide Kafka brokers, but I am not aware of
clients, I cannot say if anyone would even consume the data - this can
be somehow compared to listing existing consumers/connections in
RabbitMQ queues (theoretically, someone use some kind of network
interface monitoring for an estimate)
b) cleanup of unused topics - when Kafka is used as a messaging hub,
with dynamically created topics, we can reach a very large number of
them. Removing the ones that still have active consumers causes odd
side effects (e.g. the consumers are again reading from offset 0) and
bugs ( https://issues.apache.org/jira/browse/KAFKA-6438 ).

[fwiw, b) also can be solved by having topic TTL (in a fashion similar
to e.g. RabbitMQ) - I will be submitting a relevant KIP soon]

Yours sincerely,
Adam Kotwasinski

2018-05-18 19:49 GMT+01:00 Dong Lin <lindon...@gmail.com>:
> Hey Adam,
>
> Got it. Thanks for the explanation. Could you also explain the use-case for
> knowing whether a topic with no new messages has active consumers? Just
> want to make sure that the use-case is reasonable and can not be
> accomplished with the existing available approach.
>
> Thanks,
> Dong
>
>
> On Fri, May 18, 2018 at 11:10 AM, Adam Kotwasinski <
> adam.kotwasin...@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Unfortunately this does not work when there are no messages, but
>> consumers are still running.
>> BytesOutPerSec increases only when the messages are being delivered to
>> the consumer.
>> But you could have a situation when consumer is already at the end of
>> log & noone is going to produce any more messages.
>> In such a case you would be getting fetch requests, and nothing would
>> be sent out.
>>
>> Trivial to replicate with `kafka-console-consumer.sh
>> --bootstrap-server localhost:9092 --topic idontexist`
>>
>> Best regards,
>> Adam
>>
>> 2018-05-18 18:49 GMT+01:00 Dong Lin <lindon...@gmail.com>:
>> > Hey Adam,
>> >
>> > Thanks for the KIP. We currently already have the per-topic byte-out-rate
>> > (not including replication traffic) with MBean
>> > path kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,topic=*.
>> > Though this is not the FetchRequest rate, it seems to address the
>> > motivation of the KIP by telling Kafka cluster operator whether consumers
>> > are actively reading from a topic. Does that address the problem?
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> >
>> > On Fri, May 18, 2018 at 8:55 AM, Adam Kotwasinski <
>> > adam.kotwasin...@gmail.com> wrote:
>> >
>> >> Hello,
>> >>
>> >> There haven't been many comments regarding this KIP (not surprising as
>> >> the code change is ~10 lines long). Would it be okay to start the
>> >> vote?
>> >>
>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?
>> pageId=80452537
>> >>
>> >> Best regards,
>> >> Adam Kotwasinski
>> >>
>> >> On Wed, May 2, 2018 at 6:49 AM, Adam Kotwasinski <
>> >> adam.kotwasin...@gmail.com
>> >> > wrote:
>> >>
>> >> > Hello,
>> >> >
>> >> > I have created a KIP to add some additional metrics re number of fetch
>> >> > requests made to brokers -
>> >> > https://cwiki.apache.org/confluence/pages/viewpage.
>> >> action?pageId=80452537
>> >> >
>> >> > Feedback and suggestions are welcome.
>> >> >
>> >> > Best regards,
>> >> > Adam Kotwasinski
>> >> >
>> >>
>>


Re: [DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-18 Thread Adam Kotwasinski
Hi Dong,

Unfortunately this does not work when there are no messages, but
consumers are still running.
BytesOutPerSec increases only when the messages are being delivered to
the consumer.
But you could have a situation when consumer is already at the end of
log & noone is going to produce any more messages.
In such a case you would be getting fetch requests, and nothing would
be sent out.

Trivial to replicate with `kafka-console-consumer.sh
--bootstrap-server localhost:9092 --topic idontexist`

Best regards,
Adam

2018-05-18 18:49 GMT+01:00 Dong Lin <lindon...@gmail.com>:
> Hey Adam,
>
> Thanks for the KIP. We currently already have the per-topic byte-out-rate
> (not including replication traffic) with MBean
> path kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,topic=*.
> Though this is not the FetchRequest rate, it seems to address the
> motivation of the KIP by telling Kafka cluster operator whether consumers
> are actively reading from a topic. Does that address the problem?
>
> Thanks,
> Dong
>
>
>
> On Fri, May 18, 2018 at 8:55 AM, Adam Kotwasinski <
> adam.kotwasin...@gmail.com> wrote:
>
>> Hello,
>>
>> There haven't been many comments regarding this KIP (not surprising as
>> the code change is ~10 lines long). Would it be okay to start the
>> vote?
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537
>>
>> Best regards,
>> Adam Kotwasinski
>>
>> On Wed, May 2, 2018 at 6:49 AM, Adam Kotwasinski <
>> adam.kotwasin...@gmail.com
>> > wrote:
>>
>> > Hello,
>> >
>> > I have created a KIP to add some additional metrics re number of fetch
>> > requests made to brokers -
>> > https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=80452537
>> >
>> > Feedback and suggestions are welcome.
>> >
>> > Best regards,
>> > Adam Kotwasinski
>> >
>>


Re: [DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-18 Thread Adam Kotwasinski
Hello,

There haven't been many comments regarding this KIP (not surprising as
the code change is ~10 lines long). Would it be okay to start the
vote?

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537

Best regards,
Adam Kotwasinski

On Wed, May 2, 2018 at 6:49 AM, Adam Kotwasinski <adam.kotwasin...@gmail.com
> wrote:

> Hello,
>
> I have created a KIP to add some additional metrics re number of fetch
> requests made to brokers -
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537
>
> Feedback and suggestions are welcome.
>
> Best regards,
> Adam Kotwasinski
>


Re: [DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-02 Thread Adam Kotwasinski
Looks like my understanding of decaying metrics was wrong, could not get
the viable example.
The difference in values when marking one metric 10k times, and then
marking the other 10k after 1 minute were to the order of 10^-12, what is
not interesting.

So, I had updated the pull request accordingly (removed the Replication*
metric).

I am wondering whether it would be preferable to have the removed
replication metric visible (however that would need to be view-metric that
would perform the subtraction, and most probably would force subclassing
MetricRegistry at (com.yammer.metrics.Metrics.DEFAULT_REGISTRY)- its
generic getOrAdd method is protected).

AK~

2018-05-02 16:11 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

> Can you explain Rejected Alternatives #2 in more detail ?
>
> Maybe give some concrete example to show the summation of the two new
> metrics is not equal to the existing metric.
>
> On Wed, May 2, 2018 at 6:49 AM, Adam Kotwasinski <
> adam.kotwasin...@gmail.com
> > wrote:
>
> > Hello,
> >
> > I have created a KIP to add some additional metrics re number of fetch
> > requests made to brokers -
> > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=80452537
> >
> > Feedback and suggestions are welcome.
> >
> > Best regards,
> > Adam Kotwasinski
> >
>


[DISCUSS] KIP-293 Add new metrics for consumer/replication fetch requests

2018-05-02 Thread Adam Kotwasinski
Hello,

I have created a KIP to add some additional metrics re number of fetch
requests made to brokers -
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452537

Feedback and suggestions are welcome.

Best regards,
Adam Kotwasinski


Permission request to create a KIP

2018-05-01 Thread Adam Kotwasinski
Hello,

I'd like to create a KIP - Add new metrics for consumer/replication fetch
requests (re https://issues.apache.org/jira/browse/KAFKA-6830)

Could you please grant me permission to create pages at
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


My id is: adam.kotwasinski

Yours sincerely,
Adam Kotwasinski


Permission request

2018-04-27 Thread Adam Kotwasinski
Hello,

Could you please add me to the contributor list, so I could like to assign
https://issues.apache.org/jira/browse/KAFKA-6830 to myself?

Thank you.

Best regards,
Adam Kotwasinski


[jira] [Created] (KAFKA-6830) Add new metrics for consumer/replication fetch requests

2018-04-26 Thread Adam Kotwasinski (JIRA)
Adam Kotwasinski created KAFKA-6830:
---

 Summary: Add new metrics for consumer/replication fetch requests
 Key: KAFKA-6830
 URL: https://issues.apache.org/jira/browse/KAFKA-6830
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Adam Kotwasinski


Currently, we have only one fetch request-related metric for a topic.
As fetch requests are used by both client consumers and replicating brokers, it 
is impossible to tell if the particular partition (with replication factor > 1) 
is being actively read from client by consumers.

Rationale for this improvement: as owner of kafka installation, but not the 
owner of clients, I want to know which topics still have active (real) 
consumers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Adam Kotwasinski (JIRA)
Adam Kotwasinski created KAFKA-6438:
---

 Summary: NSEE while concurrently creating and deleting a topic
 Key: KAFKA-6438
 URL: https://issues.apache.org/jira/browse/KAFKA-6438
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 1.0.0
 Environment: kafka_2.11-1.0.0.jar
OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
(build 25.102-b14, mixed mode)
CentOS Linux release 7.3.1611 (Core)
Reporter: Adam Kotwasinski


It appears that deleting a topic and creating it at the same time can cause 
NSEE, what later results in a forced controller shutdown.

Most probably topics are being created because consumers/producers are still 
active (yes, this means the deletion is happening blindly).

The main problem here (for me) is the controller switch, the data loss and 
following unclean election is acceptable (as we admit to deleting blindly).

Environment description:
20 kafka brokers
80k partitions (20k topics 4partitions each)
3 node ZK

Incident:
{code:java}
[2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OfflinePartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NonExistentPartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
other, querytickle_WD2-SALES1_espgms0202v29)], deleted topics: [Set()], new 
partition replica assignment [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), 
mytopic-0 -> Vector(4), other-2 -> Vector(10), mytopic-1 -> Vector(5), 
mytopic-3 -> Vector(7), other-1 -> Vector(9), other-3 -> Vector(11))] 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
callback for 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NewPartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OnlinePartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,828] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:09,127] INFO [Controller id=6] New leader and ISR