lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1872198214
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +327,20 @@ Timer timer() {
return timer;
}
+ /**
+ * Set the time when the request was enqueued to {@link
NetworkClientDelegate#unsentRequests}.
+ */
+ void setEnqueueTimeMs(final long enqueueTimeMs) {
+ this.enqueueTimeMs = enqueueTimeMs;
+ }
+
+ /**
+ * Return the time when the request was enqueued to {@link
NetworkClientDelegate#unsentRequests}.
+ */
+ long enqueueTimeMs() {
Review Comment:
this one is less sensitive, but if it's only used here as it seems we could
consider private too
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+ private final Time time = new MockTime();
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue =
new LinkedBlockingQueue<>();
+ private final ApplicationEventProcessor applicationEventProcessor =
mock(ApplicationEventProcessor.class);
+ private final NetworkClientDelegate networkClientDelegate =
mock(NetworkClientDelegate.class);
+ private final RequestManagers requestManagers =
mock(RequestManagers.class);
+ private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
+
+ @Test
+ public void testRecordApplicationEventQueueSize() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, "consumer");
+ ApplicationEventHandler applicationEventHandler = new
ApplicationEventHandler(
+ new LogContext(),
+ time,
+ applicationEventsQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ PollEvent event = new PollEvent(time.milliseconds());
+
+ // add event
+ applicationEventHandler.add(event);
+ assertEquals(1, (double)
metrics.metric(metrics.metricName("application-event-queue-size",
"consumer-metrics")).metricValue());
Review Comment:
could we reuse the metric name constants we already have?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection<String>
topics, Optional<ConsumerRebal
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor
will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
+ *
+ * Visible for testing.
*/
- private boolean processBackgroundEvents() {
+ boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
- LinkedList<BackgroundEvent> events = new LinkedList<>();
- backgroundEventQueue.drainTo(events);
-
- for (BackgroundEvent event : events) {
- try {
- if (event instanceof CompletableEvent)
- backgroundEventReaper.add((CompletableEvent<?>) event);
-
- backgroundEventProcessor.process(event);
- } catch (Throwable t) {
- KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
- if (!firstError.compareAndSet(null, e))
- log.warn("An error occurred when processing the background
event: {}", e.getMessage(), e);
+ List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
+ if (!events.isEmpty()) {
+ long startMs = time.milliseconds();
+ for (BackgroundEvent event : events) {
+
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() -
event.enqueuedMs());
+ try {
+ if (event instanceof CompletableEvent)
+ backgroundEventReaper.add((CompletableEvent<?>) event);
+
+ backgroundEventProcessor.process(event);
+ } catch (Throwable t) {
+ KafkaException e =
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+ if (!firstError.compareAndSet(null, e))
+ log.warn("An error occurred when processing the
background event: {}", e.getMessage(), e);
+ }
}
+
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
}
backgroundEventReaper.reap(time.milliseconds());
Review Comment:
Interesting, and if we agree on what we want we could just send an update in
the KIP email thread to add it to the KIP and here.
To align internally first, I guess we would be interested in the num/avg of
expired events, but we need to consider how that metric would go crazy and be a
false alarm in cases like poll(0) right? Should we consider relevant the
expiration only if there was a non-zero timeout? Thoughts?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+ private final Time time = new MockTime();
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue =
new LinkedBlockingQueue<>();
+ private final ApplicationEventProcessor applicationEventProcessor =
mock(ApplicationEventProcessor.class);
+ private final NetworkClientDelegate networkClientDelegate =
mock(NetworkClientDelegate.class);
+ private final RequestManagers requestManagers =
mock(RequestManagers.class);
+ private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
+
+ @Test
+ public void testRecordApplicationEventQueueSize() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, "consumer");
+ ApplicationEventHandler applicationEventHandler = new
ApplicationEventHandler(
+ new LogContext(),
+ time,
+ applicationEventsQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ PollEvent event = new PollEvent(time.milliseconds());
+
+ // add event
+ applicationEventHandler.add(event);
Review Comment:
```suggestion
// add event
applicationEventHandler.add(new PollEvent(time.milliseconds()));
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +327,20 @@ Timer timer() {
return timer;
}
+ /**
+ * Set the time when the request was enqueued to {@link
NetworkClientDelegate#unsentRequests}.
+ */
+ void setEnqueueTimeMs(final long enqueueTimeMs) {
Review Comment:
this should be private right?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BackgroundEventHandlerTest {
+ private final BlockingQueue<BackgroundEvent> backgroundEventsQueue = new
LinkedBlockingQueue<>();
+
+ @Test
+ public void testRecordBackgroundEventQueueSize() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, "consumer")) {
+ BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ backgroundEventsQueue,
+ new MockTime(0),
+ asyncConsumerMetrics);
+ BackgroundEvent event = new ErrorEvent(new Throwable());
+
+ // add event
+ backgroundEventHandler.add(event);
Review Comment:
```suggestion
// add event
backgroundEventHandler.add(new ErrorEvent(new Throwable()));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -199,4 +203,57 @@ public void testSendUnsentRequests() {
consumerNetworkThread.cleanup();
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
}
+
+ @Test
+ public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, "consumer");
+ ConsumerNetworkThread consumerNetworkThread = new
ConsumerNetworkThread(
+ new LogContext(),
+ time,
+ applicationEventQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ consumerNetworkThread.initializeResources();
+
+ consumerNetworkThread.runOnce();
+ time.sleep(10);
+ consumerNetworkThread.runOnce();
+ assertTrue((double)
metrics.metric(metrics.metricName("time-between-network-thread-poll-avg",
"consumer-metrics")).metricValue() > 0);
+ assertTrue((double)
metrics.metric(metrics.metricName("time-between-network-thread-poll-max",
"consumer-metrics")).metricValue() > 0);
Review Comment:
couldn't we be more precise here (I guess it should be 10 exactly given how
this is calculated right? not sure if we would need >= in this case)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1951,6 +1952,31 @@ public void
testSubscribePatternAgainstBrokerNotSupportingRegex() throws Interru
}, "Consumer did not throw the expected UnsupportedVersionException on
poll");
}
+ @Test
+ public void
testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
+ consumer = newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ mock(SubscriptionState.class),
+ "group-id",
+ "client-id",
+ false);
+ Metrics metrics = consumer.metricsRegistry();
+ AsyncConsumerMetrics kafkaConsumerMetrics =
consumer.kafkaConsumerMetrics();
+
+ ConsumerRebalanceListenerCallbackNeededEvent event = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
Collections.emptySortedSet());
+ event.setEnqueuedMs(time.milliseconds());
+ backgroundEventQueue.add(event);
+ kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+ time.sleep(10);
+ consumer.processBackgroundEvents();
+ assertEquals(0, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
"consumer-metrics")).metricValue());
+ assertTrue((double)
metrics.metric(metrics.metricName("background-event-queue-time-avg",
"consumer-metrics")).metricValue() > 0);
+ assertTrue((double)
metrics.metric(metrics.metricName("background-event-queue-time-max",
"consumer-metrics")).metricValue() > 0);
Review Comment:
couldn't we be more precise here and expect >= 10?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements
AutoCloseable {
+ private final Metrics metrics;
+
+ public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
+ public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME =
"application-event-queue-size";
+ public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME =
"application-event-queue-time";
+ public static final String
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"application-event-queue-processing-time";
+ public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME =
"background-event-queue-size";
+ public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME =
"background-event-queue-time";
+ public static final String
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"background-event-queue-processing-time";
+ public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME =
"unsent-requests-queue-size";
+ public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME =
"unsent-requests-queue-time";
+ private final Sensor timeBetweenNetworkThreadPollSensor;
+ private final Sensor applicationEventQueueSizeSensor;
+ private final Sensor applicationEventQueueTimeSensor;
+ private final Sensor applicationEventQueueProcessingTimeSensor;
+ private final Sensor backgroundEventQueueSizeSensor;
+ private final Sensor backgroundEventQueueTimeSensor;
+ private final Sensor backgroundEventQueueProcessingTimeSensor;
+ private final Sensor unsentRequestsQueueSizeSensor;
+ private final Sensor unsentRequestsQueueTimeSensor;
+
+ public AsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+ super(metrics, metricGrpPrefix);
+
+ this.metrics = metrics;
+ final String metricGroupName = metricGrpPrefix +
CONSUMER_METRICS_SUFFIX;
+ this.timeBetweenNetworkThreadPollSensor =
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg",
+ metricGroupName,
+ "The average time taken, in milliseconds, between each poll in
the network thread."),
+ new Avg());
+
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max",
+ metricGroupName,
+ "The maximum time taken, in milliseconds, between each poll in
the network thread."),
+ new Max());
+
+ this.applicationEventQueueSizeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+
this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size",
+ metricGroupName,
+ "The current number of events in the consumer network
application event queue."),
Review Comment:
nit: I guess that, in a time from now, even us that know this by heart will
get tricked with if this is the outgoing or incoming queue. Should we be more
explicit with something like
```suggestion
"The current number of events in the queue to send from the
application thread to the background thread."),
```
(and then we can consistently have the flipped version of the message for
the `background-event-queue-size` metric)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]