junrao commented on code in PR #14218: URL: https://github.com/apache/kafka/pull/14218#discussion_r1300543812
########## clients/src/main/java/org/apache/kafka/clients/ClientUtils.java: ########## @@ -246,13 +247,22 @@ public static NetworkClient createNetworkClient(AbstractConfig config, } } - public static <T> List createConfiguredInterceptors(AbstractConfig config, - String interceptorClassesConfigName, - Class<T> clazz) { + public static <T> List getConfiguredInterceptors(AbstractConfig config, Review Comment: We tend not to use getters. Could `getConfiguredInterceptors` just be `configuredInterceptors`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ########## @@ -87,22 +87,19 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c } public static LogContext createLogContext(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) { - String groupId = String.valueOf(groupRebalanceConfig.groupId); - String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); - String logPrefix; - String groupInstanceId = groupRebalanceConfig.groupInstanceId.orElse(null); - - if (groupInstanceId != null) { - // If group.instance.id is set, we will append it to the log context. - logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, groupId=%s] ", groupInstanceId, clientId, groupId); + Optional<String> groupId = Optional.ofNullable(groupRebalanceConfig.groupId); + String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + + // If group.instance.id is set, we will append it to the log context. + if (groupRebalanceConfig.groupInstanceId.isPresent()) { + return new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + + ", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } else { - logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", clientId, groupId); + return new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } - - return new LogContext(logPrefix); } - public static IsolationLevel createIsolationLevel(ConsumerConfig config) { + public static IsolationLevel getConfiguredIsolationLevel(ConsumerConfig config) { Review Comment: We tend not to use getters. Could this just be `configuredIsolationLevel`? Ditto for `getConfiguredConsumerInterceptors` below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -206,46 +199,34 @@ public void run() { * 3. Poll the networkClient to send and retrieve the response. */ void runOnce() { - drain(); + if (!applicationEventQueue.isEmpty()) { + LinkedList<ApplicationEvent> res = new LinkedList<>(); + this.applicationEventQueue.drainTo(res); + + for (ApplicationEvent event : res) { + log.debug("Consuming application event: {}", event); + Objects.requireNonNull(event); + applicationEventProcessor.process(event); + } + } + final long currentTimeMs = time.milliseconds(); - final long pollWaitTimeMs = requestManagerRegistry.values().stream() + final long pollWaitTimeMs = requestManagers.entries().stream() .filter(Optional::isPresent) .map(m -> m.get().poll(currentTimeMs)) + .filter(Objects::nonNull) Review Comment: The description of `RequestManager.poll()` suggests that it will never return null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org