lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1847318144
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent>
backgroundEventQueue)
*/
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be
non-null");
+ event.setEnqueuedMs(time.milliseconds());
backgroundEventQueue.add(event);
+ kafkaConsumerMetrics.ifPresent(metrics ->
metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()));
Review Comment:
this change here makes sense to me, but makes me wonder if we should push it
further. It would be helpful if we could try to keep all the updates for this
queue size metric in this component that holds the queue, so we can easily
maintain/track how "add" and "remove/drain" update that metric.
We could then use that drain from the processBackgroundEvents, instead of
manually draining the queue and recording the metric there . What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -162,15 +171,20 @@ void runOnce() {
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);
+ kafkaAsyncConsumerMetrics.ifPresent(metrics ->
metrics.recordApplicationEventQueueSize(0));
for (ApplicationEvent event : events) {
+ long startMs = time.milliseconds();
+ kafkaAsyncConsumerMetrics.ifPresent(metrics ->
metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs()));
try {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);
applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
+ } finally {
+ kafkaAsyncConsumerMetrics.ifPresent(metrics ->
metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() -
startMs));
Review Comment:
Is this the best place to record this? From the description of the metric I
get that we want to measure the time "that the consumer network takes to
process **all available application events**". So wouldn't it be simpler to
record the metric once per `runOnce` instead of recording it N times on each
run? (startTime right before the loop over events, and ending/recording right
after the loop).
I went to the KIP discussion thread to double check this interpretation, and
this was the intention behind what was proposed (by me actually I discovered he
he).
> LM3. Thinking about the actual usage of
"time-between-network-thread-poll-xxx" metric, I imagine it would be helpful to
know more about what could be impacting it. As I see it, the network thread
cadence could be mainly impacted by: 1- app event processing (generate
requests), 2- network client poll (actual send/receive). For 2, the new
consumer reuses the same component as the legacy one, but 1 is specific to the
new consumer, so what about a metric for application-event-processing-time-ms
(we could consider avg I would say). It would be the time that the network
thread takes to process all available events on each run.
What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -162,15 +171,20 @@ void runOnce() {
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);
+ kafkaAsyncConsumerMetrics.ifPresent(metrics ->
metrics.recordApplicationEventQueueSize(0));
Review Comment:
keeping the symmetry with the background event, would it make sense to
encapsulate these actions in the `ApplicationEventHandler` so that we keep that
component responsible of `add` and `drain` the queue (including the metric
actions related to those ops)?
It would mean that this `ConsumerNetworkThread` would keep the ref to the
`ApplicationEventHandler` that has the queue (instead of directly having the
queue like it does now), but that is already available, so I guess we just need
to pass it in the constructor instead of the queue. What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements
AutoCloseable {
+ private final Metrics metrics;
+
+ public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
+ public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME =
"application-event-queue-size";
+ public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME =
"application-event-queue-time";
+ public static final String
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"application-event-queue-processing-time";
+ public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME =
"background-event-queue-size";
+ public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME =
"background-event-queue-time";
+ public static final String
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"background-event-queue-processing-time";
+ public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME =
"unsent-requests-queue-size";
+ public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME =
"unsent-requests-queue-time";
+ private final Sensor timeBetweenNetworkThreadPollSensor;
+ private final Sensor applicationEventQueueSizeSensor;
+ private final Sensor applicationEventQueueTimeSensor;
+ private final Sensor applicationEventQueueProcessingTimeSensor;
+ private final Sensor backgroundEventQueueSizeSensor;
+ private final Sensor backgroundEventQueueTimeSensor;
+ private final Sensor backgroundEventQueueProcessingTimeSensor;
+ private final Sensor unsentRequestsQueueSizeSensor;
+ private final Sensor unsentRequestsQueueTimeSensor;
+
+ public KafkaAsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+ super(metrics, metricGrpPrefix);
+
+ this.metrics = metrics;
+ final String metricGroupName = metricGrpPrefix +
CONSUMER_METRICS_SUFFIX;
+ this.timeBetweenNetworkThreadPollSensor =
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg",
+ metricGroupName,
+ "The average time taken, in milliseconds, between each poll in
the network thread."),
+ new Avg());
+
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max",
+ metricGroupName,
+ "The maximum time taken, in milliseconds, between each poll in
the network thread."),
+ new Max());
+
+ this.applicationEventQueueSizeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+
this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size",
+ metricGroupName,
+ "The current number of events in the consumer network
application event queue."),
+ new Value());
+
+ this.applicationEventQueueTimeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME);
+
this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-avg",
+ metricGroupName,
+ "The average time, in milliseconds, that application events
are taking to be dequeued."),
+ new Avg());
+
this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-max",
+ metricGroupName,
+ "The maximum time, in milliseconds, that an application event
took to be dequeued."),
+ new Max());
+
+ this.applicationEventQueueProcessingTimeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+
this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-avg",
+ metricGroupName,
+ "The average time, in milliseconds, that the consumer network
takes to process all available application events."),
+ new Avg());
+
this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-max",
+ metricGroupName,
+ "The maximum time, in milliseconds, that the consumer network
took to process all available application events."),
+ new Max());
+
+ this.unsentRequestsQueueSizeSensor =
metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME);
+
this.unsentRequestsQueueSizeSensor.add(metrics.metricName("unsent-requests-queue-size",
+ metricGroupName,
+ "The current number of unsent requests in the consumer
network."),
+ new Value());
+
+ this.unsentRequestsQueueTimeSensor =
metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME);
+
this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-avg",
+ metricGroupName,
+ "The average time, in milliseconds, that requests are taking
to be sent in the consumer network."),
+ new Avg());
+
this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-max",
+ metricGroupName,
+ "The maximum time, in milliseconds, that a request remained
unsent in the consumer network."),
+ new Max());
+
+ this.backgroundEventQueueSizeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME);
+
this.backgroundEventQueueSizeSensor.add(metrics.metricName("background-event-queue-size",
+ metricGroupName,
+ "The current number of events in the consumer background event
queue."),
+ new Value());
+
+ this.backgroundEventQueueTimeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME);
+
this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-avg",
+ metricGroupName,
+ "The average time, in milliseconds, that background events are
taking to be dequeued."),
+ new Avg());
+
this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-max",
+ metricGroupName,
+ "The maximum time, in milliseconds, that background events are
taking to be dequeued."),
+ new Max());
+
+ this.backgroundEventQueueProcessingTimeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+
this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-avg",
+ metricGroupName,
+ "The average time, in milliseconds, that the consumer took to
process all available background events."),
+ new Avg());
+
this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-max",
+ metricGroupName,
+ "The maximum time, in milliseconds, that the consumer took to
process all available background events."),
+ new Max());
+ }
+
+ public void recordTimeBetweenNetworkThreadPoll(long
timeBetweenNetworkThreadPoll) {
+
this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll);
+ }
+
+ public void recordApplicationEventQueueSize(int size) {
+ this.applicationEventQueueSizeSensor.record(size);
+ }
+
+ public void recordApplicationEventQueueTime(long time) {
+ this.applicationEventQueueTimeSensor.record(time);
+ }
+
+ public void recordApplicationEventQueueProcessingTime(long processingTime)
{
+ this.applicationEventQueueProcessingTimeSensor.record(processingTime);
+ }
+
+ public void recordUnsentRequestsQueueSize(int size) {
Review Comment:
This metric is about the size of the queue at a given time, so I expect we
should have another param here timeMs, for the time where we read the metric,
and we should pass it into the .record, that has an overload for it.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -170,6 +174,7 @@ private void trySend(final long currentTimeMs) {
Iterator<UnsentRequest> iterator = unsentRequests.iterator();
while (iterator.hasNext()) {
UnsentRequest unsent = iterator.next();
+ kafkaConsumerMetrics.ifPresent(metrics ->
metrics.recordUnsentRequestsQueueTime(currentTimeMs - unsent.enqueuedMs()));
Review Comment:
recording this here means we would consider the request removed from the
unsent queue even in the case where it cannot be sent and it actually stays in
the unsent queue (!doSend), right? If so, I guess we should probably record
this only when we do remove it from the queue with iterator.remove() (either
because it's expired, or because we did sent it).
Also, shouldn't we record this same metric on `checkDisconnects` if the
request is removed from the unsent queue because the node is disconnected?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1911,6 +1922,8 @@ private boolean processBackgroundEvents() {
if (!firstError.compareAndSet(null, e))
log.warn("An error occurred when processing the background
event: {}", e.getMessage(), e);
+ } finally {
+
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
Review Comment:
Similar to `recordApplicationEventQueueProcessingTime`. The metric
description states this is about the time "that the consumer took to process
**all available background events**' . Shouldn't we simply take the time from
right before the loop to right after it ends, and record the metric once per
run of the `processBackgroundEvents`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1893,14 +1899,19 @@ private void subscribeInternal(Collection<String>
topics, Optional<ConsumerRebal
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor
will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
+ *
+ * Visible for testing.
*/
- private boolean processBackgroundEvents() {
+ boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
LinkedList<BackgroundEvent> events = new LinkedList<>();
backgroundEventQueue.drainTo(events);
+
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
Review Comment:
This is what I was wondering if we could encapsulate a
BackgroundEventHandler.drain or similar, that would take care of draining the
queue and recording the metric (all metric updates done there consistently)
```suggestion
LinkedList<BackgroundEvent> events =
backgroundEventHandler.drainBackgroundEvents();
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements
AutoCloseable {
+ private final Metrics metrics;
+
+ public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
+ public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME =
"application-event-queue-size";
+ public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME =
"application-event-queue-time";
+ public static final String
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"application-event-queue-processing-time";
+ public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME =
"background-event-queue-size";
+ public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME =
"background-event-queue-time";
+ public static final String
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"background-event-queue-processing-time";
+ public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME =
"unsent-requests-queue-size";
+ public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME =
"unsent-requests-queue-time";
+ private final Sensor timeBetweenNetworkThreadPollSensor;
+ private final Sensor applicationEventQueueSizeSensor;
Review Comment:
nit: extra space before Sensor
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -267,7 +272,9 @@ public void addAll(final List<UnsentRequest> requests) {
public void add(final UnsentRequest r) {
Objects.requireNonNull(r);
r.setTimer(this.time, this.requestTimeoutMs);
+ r.setEnqueuedMs(this.time.milliseconds());
unsentRequests.add(r);
+ kafkaConsumerMetrics.ifPresent(metrics ->
metrics.recordUnsentRequestsQueueSize(unsentRequests.size()));
Review Comment:
I'm still debating whether this is the best place to record this. We want
snapshots in time of the queue size. Recording here has the limitation that we
won't be recording when the size decreases (ie. requests sent, failed due to
disconnections). So I wonder if recording this on poll, which is called
regularly, would given a better view of the queue size?
The way add/poll are used from the ConsumerNetworkThread.runOnce they end up
being called sequentially anyways, but I'm thinking about the case where, let's
say managers are not returning any requests (so addAll is called with empty,
add never called), but there could be unsent requests in the queue, that could
be sent out, cancelled, time out, etc). Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]