junrao commented on code in PR #14218:
URL: https://github.com/apache/kafka/pull/14218#discussion_r1300543812


##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -246,13 +247,22 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
         }
     }
 
-    public static <T> List createConfiguredInterceptors(AbstractConfig config,
-                                                        String 
interceptorClassesConfigName,
-                                                        Class<T> clazz) {
+    public static <T> List getConfiguredInterceptors(AbstractConfig config,

Review Comment:
   We tend not to use getters. Could `getConfiguredInterceptors` just be 
`configuredInterceptors`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##########
@@ -87,22 +87,19 @@ public static ConsumerNetworkClient 
createConsumerNetworkClient(ConsumerConfig c
     }
 
     public static LogContext createLogContext(ConsumerConfig config, 
GroupRebalanceConfig groupRebalanceConfig) {
-        String groupId = String.valueOf(groupRebalanceConfig.groupId);
-        String clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-        String logPrefix;
-        String groupInstanceId = 
groupRebalanceConfig.groupInstanceId.orElse(null);
-
-        if (groupInstanceId != null) {
-            // If group.instance.id is set, we will append it to the log 
context.
-            logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, 
groupId=%s] ", groupInstanceId, clientId, groupId);
+        Optional<String> groupId = 
Optional.ofNullable(groupRebalanceConfig.groupId);
+        String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+
+        // If group.instance.id is set, we will append it to the log context.
+        if (groupRebalanceConfig.groupInstanceId.isPresent()) {
+            return new LogContext("[Consumer instanceId=" + 
groupRebalanceConfig.groupInstanceId.get() +
+                    ", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
         } else {
-            logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", 
clientId, groupId);
+            return new LogContext("[Consumer clientId=" + clientId + ", 
groupId=" + groupId.orElse("null") + "] ");
         }
-
-        return new LogContext(logPrefix);
     }
 
-    public static IsolationLevel createIsolationLevel(ConsumerConfig config) {
+    public static IsolationLevel getConfiguredIsolationLevel(ConsumerConfig 
config) {

Review Comment:
   We tend not to use getters. Could this just be `configuredIsolationLevel`? 
Ditto for `getConfiguredConsumerInterceptors` below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -206,46 +199,34 @@ public void run() {
      * 3. Poll the networkClient to send and retrieve the response.
      */
     void runOnce() {
-        drain();
+        if (!applicationEventQueue.isEmpty()) {
+            LinkedList<ApplicationEvent> res = new LinkedList<>();
+            this.applicationEventQueue.drainTo(res);
+
+            for (ApplicationEvent event : res) {
+                log.debug("Consuming application event: {}", event);
+                Objects.requireNonNull(event);
+                applicationEventProcessor.process(event);
+            }
+        }
+
         final long currentTimeMs = time.milliseconds();
-        final long pollWaitTimeMs = requestManagerRegistry.values().stream()
+        final long pollWaitTimeMs = requestManagers.entries().stream()
                 .filter(Optional::isPresent)
                 .map(m -> m.get().poll(currentTimeMs))
+                .filter(Objects::nonNull)

Review Comment:
   The description of `RequestManager.poll()` suggests that it will never 
return null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to