kirktrue commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1859369437
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -507,7 +511,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition>
partitions) {
BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
this.backgroundEventQueue = new LinkedBlockingQueue<>();
- BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue, time, asyncConsumerMetrics);
Review Comment:
```suggestion
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
asyncConsumerMetrics
);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -214,10 +213,11 @@ private void process(final
ConsumerRebalanceListenerCallbackNeededEvent event) {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final AtomicReference<Optional<ConsumerGroupMetadata>>
groupMetadata = new AtomicReference<>(Optional.empty());
- private final KafkaConsumerMetrics kafkaConsumerMetrics;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
Review Comment:
I'd prefer it be left as `kafkaConsumerMetrics`. Calling out the distinction
in the variable name isn't really adding anything (IMO).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -350,7 +352,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition>
partitions) {
metrics,
fetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
- backgroundEventHandler);
+ backgroundEventHandler,
+ asyncConsumerMetrics);
Review Comment:
Nit: minor alignment issue:
```suggestion
backgroundEventHandler,
asyncConsumerMetrics);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -61,6 +62,14 @@ public Uuid id() {
return id;
}
+ public void setEnqueuedMs(long enqueuedMs) {
+ this.enqueuedMs = enqueuedMs;
+ }
Review Comment:
> we probably should be able to see the enqueued time in the string
representation of the events
Agreed. Good point. Thanks @AndrewJSchofield.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -338,7 +339,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition>
partitions) {
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue, time, asyncConsumerMetrics);
Review Comment:
Nitpick: for better or worse, we've adopted this style for multi-line
parameter lists:
```suggestion
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
asyncConsumerMetrics
);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java:
##########
@@ -38,6 +38,12 @@ public enum Type {
*/
private final Uuid id;
+ /**
+ * The time in milliseconds when this event was enqueued.
+ * This field can be changed after the event is created, so it should not
be used in hashCode, equals, or toStringBase.
+ */
+ private long enqueuedMs;
Review Comment:
We also need the `enqueuedMs` in the `toStringBase()` method as per the
`ApplicationEvent`’s method of the same name.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -42,26 +44,32 @@
public class ApplicationEventHandler implements Closeable {
private final Logger log;
+ private final Time time;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
Review Comment:
I agree that this is an area that could use some refactoring. Thanks for
filing KAFKA-18048, @FrankYang0529.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]