Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to `KafkaStreams` similar to the proposed `clientInstanceId()` that will be added to consumer/producer/admin clients.

Without addressing this, Kafka Streams users won't have a way to get the assigned `instanceId` of the internally created clients, and thus it would be very difficult for them to know which metrics that the broker receives belong to a Kafka Streams app. It seems they would only find the `instanceIds` in the log4j output if they enable client logging?

Of course, because there is multiple clients inside Kafka Streams, the return type cannot be an single "String", but must be some some complex data structure -- we could either add a new class, or return a Map<String,String> using a client key that maps to the `instanceId`.

For example we could use the following key:

   [Global]StreamThread[-<threadIndex>][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because collection all the `instanceId` might be a blocking all on each client? I have already a few idea how it could be implemented but I don't think it must be discussed on the KIP, as it's an implementation detail.

Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:
Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.

I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.

I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.

I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.

I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.

Thanks,
Andrew

On 2 Oct 2023, at 23:47, Matthias J. Sax <mj...@apache.org> wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:

Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.

The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:

The following labels should be added by the client as appropriate before 
metrics are pushed.

Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to attach any such label to the metrics it sends. 
(Also producer and admin don't even know the value of `application.id` -- only the (main) 
consumer, indirectly via `group.id`, but also restore and global consumer don't know it, 
because they don't have `group.id` set).

While I am totally in favor of the proposal, I am wondering how we intent to implement it 
in clean way? Or would we do ok to have some internal client APIs that KS can use to 
"register" itself with the client?



While clients must support both temporalities, the broker will initially only 
send GetTelemetrySubscriptionsResponse.DeltaTemporality=True

Not sure if I can follow. How make the decision about DELTA or CUMULATIVE metrics? Should 
the broker side plugin not decide what metrics it what to receive in which form? So what 
does "initially" mean -- the broker won't ship with a default plugin 
implementation?



The following method is added to the Producer, Consumer, and Admin client 
interfaces:

Should we add anything to Kafka Streams to expose the underlying clients' 
assigned client-instance-ids programmatically? I am also wondering if clients 
should report their assigned client-instance-ids as metrics itself (for this 
case, Kafka Streams won't need to do anything, because we already expose all 
client metrics).

If we add anything programmatic, we need to make it simple, given that Kafka 
Streams has many clients per `StreamThread` and may have multiple threads.



enable.metrics.push
It might be worth to add this to `StreamsConfig`, too? It set via 
StreamsConfig, we would forward it to all clients automatically.




-Matthias


On 9/29/23 5:45 PM, David Jacot wrote:
Hi Andrew,
Thanks for driving this one. I haven't read all the KIP yet but I already
have an initial question. In the Threading section, it is written
"KafkaConsumer: the "background" thread (based on the consumer threading
refactor which is underway)". If I understand this correctly, it means
that KIP-714 won't work if the "old consumer" is used. Am I correct?
Cheers,
David
On Fri, Sep 22, 2023 at 12:18 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:
Hi Philip,
No, I do not think it should actively search for a broker that supports
the new
RPCs. In general, either all of the brokers or none of the brokers will
support it.
In the window, where the cluster is being upgraded or client telemetry is
being
enabled, there might be a mixed situation. I wouldn’t put too much effort
into
this mixed scenario. As the client finds brokers which support the new
RPCs,
it can begin to follow the KIP-714 mechanism.

Thanks,
Andrew

On 22 Sep 2023, at 20:01, Philip Nee <philip...@gmail.com> wrote:

Hi Andrew -

Question on top of your answers: Do you think the client should actively
search for a broker that supports this RPC? As previously mentioned, the
broker uses the leastLoadedNode to find its first connection (am
I correct?), and what if that broker doesn't support the metric push?

P

On Fri, Sep 22, 2023 at 10:20 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Kirk,
Thanks for your question. You are correct that the presence or absence
of
the new RPCs in the
ApiVersionsResponse tells the client whether to request the telemetry
subscriptions and push
metrics.

This is of course tricky in practice. It would be conceivable, as a
cluster is upgraded to AK 3.7
or as a client metrics receiver plugin is deployed across the cluster,
that a client connects to some
brokers that support the new RPCs and some that do not.

Here’s my suggestion:
* If a client is not connected to any brokers that support in the new
RPCs, it cannot push metrics.
* If a client is only connected to brokers that support the new RPCs, it
will use the new RPCs in
accordance with the KIP.
* If a client is connected to some brokers that support the new RPCs and
some that do not, it will
use the new RPCs with the supporting subset of brokers in accordance
with
the KIP.

Comments?

Thanks,
Andrew

On 22 Sep 2023, at 16:01, Kirk True <k...@kirktrue.pro> wrote:

Hi Andrew/Jun,

I want to make sure I understand question/comment #119… In the case
where a cluster without a metrics client receiver is later reconfigured
and
restarted to include a metrics client receiver, do we want the client to
thereafter begin pushing metrics to the cluster? From Andrew’s response
to
question #119, it sounds like we’re using the presence/absence of the
relevant RPCs in ApiVersionsResponse as the to-push-or-not-to-push
indicator. Do I have that correct?

Thanks,
Kirk

On Sep 21, 2023, at 7:42 AM, Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Jun,
Thanks for your comments. I’ve updated the KIP to clarify where
necessary.

110. Yes, agree. The motivation section mentions this.

111. The replacement of ‘-‘ with ‘.’ for metric names and the
replacement of
‘-‘ with ‘_’ for attribute keys is following the OTLP guidelines. I
think it’s a bit
of a debatable point. OTLP makes a distinction between a namespace
and a
multi-word component. If it was “client.id” then “client” would be a
namespace with
an attribute key “id”. But “client_id” is just a key. So, it was
intentional, but debatable.

112. Thanks. The link target moved. Fixed.

113. Thanks. Fixed.

114.1. If a standard metric makes sense for a client, it should use
the
exact same
name. If a standard metric doesn’t make sense for a client, then it
can
omit that metric.

For a required metric, the situation is stronger. All clients must
implement these
metrics with these names in order to implement the KIP. But the
required metrics
are essentially the number of connections and the request latency,
which do not
reference the underlying implementation of the client (which
producer.record.queue.time.max
of course does).

I suppose someone might build a producer-only client that didn’t have
consumer metrics.
In this case, the consumer metrics would conceptually have the value 0
and would not
need to be sent to the broker.

114.2. If a client does not implement some metrics, they will not be
available for
analysis and troubleshooting. It just makes the ability to combine
metrics from lots
different clients less complete.

115. I think it was probably a mistake to be so specific about
threading in this KIP.
When the consumer threading refactor is complete, of course, it would
do the appropriate
equivalent. I’ve added a clarification and massively simplified this
section.

116. I removed “client.terminating”.

117. Yes. Horrid. Fixed.

118. The Terminating flag just indicates that this is the final
PushTelemetryRequest
from this client. Any subsequent request will be rejected. I think
this
flag should remain.

119. Good catch. This was actually contradicting another part of the
KIP. The current behaviour
is indeed preserved. If the broker doesn’t have a client metrics
receiver plugin, the new RPCs
in this KIP are “turned off” and not reported in ApiVersionsResponse.
The client will not
attempt to push metrics.

120. The error handling table lists the error codes for
PushTelemetryResponse. I’ve added one
but it looked good to me. GetTelemetrySubscriptions doesn’t have any
error codes, since the
situation in which the client telemetry is not supported is handled by
the RPCs not being offered
by the broker.

121. Again, I think it’s probably a mistake to be specific about
threading. Removed.

122. Good catch. For DescribeConfigs, the ACL operation should be
“DESCRIBE_CONFIGS”. For AlterConfigs, the ACL operation should be
“ALTER” (not “WRITE” as it said). The checks are made on the CLUSTER
resource.

Thanks for the detailed review.

Thanks,
Andrew


110. Another potential motivation is the multiple clients support.
Some of
the places may not have good monitoring support for non-java clients.

111. OpenTelemetry Naming: We replace '-' with '.' for metric name
and
replace '-' with '_' for attributes. Why is the inconsistency?

112. OTLP specification: Page is not found from the link.

113. "Defining standard and required metrics makes the monitoring and
troubleshooting of clients from various client types ": Incomplete
sentence.

114. standard/required metrics
114.1 Do other clients need to implement those metrics with the exact
same
names?
114.2 What happens if some of those metrics are missing from a
client?

115. "KafkaConsumer: both the "heart beat" and application threads":
We
have an ongoing effort to refactor the consumer threading model (


https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
).
Once this is done, PRC requests will only be made from the background
thread. Should this KIP follow the new model only?

116. 'The metrics should contain the reason for the client
termination
by
including the client.terminating metric with the label “reason” ...'.
Hmm,
are we introducing a new metric client.terminating? If so, that needs
to be
explicitly listed.

117. "As the metrics plugin may need to add additional metrics on top
of
this the generic metrics receiver in the broker will not add these
labels
but rely on the plugins to do so," The sentence doesn't read well.

118. "it is possible for the client to send at most one accepted
out-of-profile per connection before the rate-limiter kicks in": If
we
do
this, do we still need the Terminating flag in
PushTelemetryRequestV0?

119. "If there is no client metrics receiver plugin configured on the
broker, it will respond to GetTelemetrySubscriptionsRequest with
RequestedMetrics set to Null and a -1 SubscriptionId. The client
should
send a new GetTelemetrySubscriptionsRequest after the PushIntervalMs
has
expired. This allows the metrics receiver to be enabled or disabled
without
having to restart the broker or reset the client connection."
"no client metrics receiver plugin configured" is defined by no
metric
reporter implementing the ClientTelemetry interface, right? In that
case,
it would be useful to avoid the clients sending
GetTelemetrySubscriptionsRequest periodically to preserve the current
behavior.

120. GetTelemetrySubscriptionsResponseV0 and PushTelemetryRequestV0:
Could
we list error codes for each?

121. "ClientTelemetryReceiver.ClientTelemetryReceiver This method may
be
called from the request handling thread": Where else can this method
be
called?

122. DescribeConfigs/AlterConfigs already exist. Are we changing the
ACL?

Thanks,

Jun

On Mon, Jul 31, 2023 at 4:33 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Milind,
Thanks for your question.

On reflection, I agree that INVALID_RECORD is most likely to be
caused by a
problem in the serialization in the client. I have changed the
client
action in this case
to “Log an error and stop pushing metrics”.

I have updated the KIP text accordingly.

Thanks,
Andrew

On 31 Jul 2023, at 12:09, Milind Luthra
<milut...@confluent.io.INVALID>
wrote:

Hi Andrew,
Thanks for the clarifications.

About 2b:
In case a client has a bug while serializing, it might be difficult
for
the
client to recover from that without code changes. In that, it might
be
good
to just log the INVALID_RECORD as an error, and treat the error as
fatal
for the client (only fatal in terms of sending the metrics, the
client
can
keep functioning otherwise). What do you think?

Thanks
Milind

On Mon, Jul 24, 2023 at 8:18 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Milind,
Thanks for your questions about the KIP.

1) I did some archaeology and looked at historical versions of the
KIP.
I
think this is
just a mistake. 5 minutes is the default metric push interval. 30
minutes
is a mystery
to me. I’ve updated the KIP.

2) I think there are two situations in which INVALID_RECORD might
occur.
a) The client might perhaps be using a content-type that the
broker
does
not support.
The KIP mentions content-type as a future extension, but there’s
only
one
supported
to start with. Until we have multiple content-types, this seems
out
of
scope. I think a
future KIP would add another error code for this.
b) The client might perhaps have a bug which means the metrics
payload
is
malformed.
Logging a warning and attempting the next metrics push on the push
interval seems
appropriate.

UNKNOWN_SUBSCRIPTION_ID would indeed be handled by making an
immediate
GetTelemetrySubscriptionsRequest.

UNSUPPORTED_COMPRESSION_TYPE seems like either a client bug or
perhaps
a situation in which a broker sends a compression type in a
GetTelemetrySubscriptionsResponse
which is subsequently not supported when its used with a
PushTelemetryRequest.
We do want the client to have the opportunity to get an up-to-date
list
of
supported
compression types. I think an immediate
GetTelemetrySubscriptionsRequest
is appropriate.

3) If a client attempts a subsequent handshake with a Null
ClientInstanceId, the
receiving broker may not already know the client's existing
ClientInstanceId. If the
receiving broker knows the existing ClientInstanceId, it simply
responds
the existing
value back to the client. If it does not know the existing
ClientInstanceId, it will create
a new client instance ID and respond with that.

I will update the KIP with these clarifications.

Thanks,
Andrew

On 17 Jul 2023, at 14:21, Milind Luthra
<milut...@confluent.io.INVALID

wrote:

Hi Andrew, thanks for this KIP.

I had a few questions regarding the "Error handling" section.

1. It mentions that "The 5 and 30 minute retries are to
eventually
trigger
a retry and avoid having to restart clients if the cluster
metrics
configuration is disabled temporarily, e.g., by operator error,
rolling
upgrades, etc."
But this 30 min interval isn't mentioned anywhere else. What is
it
referring to?

2. For the actual errors:
INVALID_RECORD : The action required is to "Log a warning to the
application and schedule the next
GetTelemetrySubscriptionsRequest
to 5
minutes". Why is this 5 minutes, and not something like
PushIntervalMs?
And
also, why are we scheduling a GetTelemetrySubscriptionsRequest in
this
case, if the serialization is broken?
UNKNOWN_SUBSCRIPTION_ID , UNSUPPORTED_COMPRESSION_TYPE : just to
confirm,
the GetTelemetrySubscriptionsRequest needs to be scheduled
immediately
after the PushTelemetry response, correct?

3. For "Subsequent GetTelemetrySubscriptionsRequests must include
the
ClientInstanceId returned in the first response, regardless of
broker":
Will a broker error be returned in case some implementation of
this KIP
violates this accidentally and sends a request with
ClientInstanceId =
Null
even when it's been obtained already? Or will a new
ClientInstanceId be
returned without an error?

Thanks!

On Tue, Jun 13, 2023 at 8:38 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi,
I would like to start a new discussion thread on KIP-714: Client
metrics
and observability.






https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability

I have edited the proposal significantly to reduce the scope.
The
overall
mechanism for client metric subscriptions is unchanged, but the
KIP is now based on the existing client metrics, rather than
introducing
new metrics. The purpose remains helping cluster operators
investigate performance problems experienced by clients without
requiring
changes to the client application code or configuration.

Thanks,
Andrew


Reply via email to