chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1580354223


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##########
@@ -132,63 +133,41 @@ public void close() throws Exception {
         }
     }
 
-    static class ConsumerRunnable implements Runnable {
-        private final String brokerAddress;
-        private final String groupId;
-        private final Properties customConfigs;
+    private static class ConsumerRunnable implements Runnable {
         private final boolean syncCommit;
         private final String topic;
-        private final String groupProtocol;
-        private final String assignmentStrategy;
-        private final Optional<String> remoteAssignor;
-        private final Properties props = new Properties();
-        private KafkaConsumer<String, String> consumer;
-        private boolean configured = false;
+        private final KafkaConsumer<String, String> consumer;
         private volatile boolean isShutdown = false;
 
-        public ConsumerRunnable(String brokerAddress,
-                                String groupId,
-                                String groupProtocol,
-                                String topic,
-                                String assignmentStrategy,
-                                Optional<String> remoteAssignor,
-                                Optional<Properties> customConfigs,
-                                boolean syncCommit) {
-            this.brokerAddress = brokerAddress;
-            this.groupId = groupId;
-            this.customConfigs = customConfigs.orElse(new Properties());
+        private ConsumerRunnable(String brokerAddress,
+                                 String groupId,
+                                 String groupProtocol,
+                                 String topic,
+                                 String assignmentStrategy,
+                                 Optional<String> remoteAssignor,
+                                 Map<String, Object> customConfigs,
+                                 boolean syncCommit) {
             this.syncCommit = syncCommit;
             this.topic = topic;
-            this.groupProtocol = groupProtocol;
-            this.assignmentStrategy = assignmentStrategy;
-            this.remoteAssignor = remoteAssignor;
 
-            this.configure();
-        }
-
-        private void configure() {
-            configured = true;
-            configure(props);
-            props.putAll(customConfigs);
-            consumer = new KafkaConsumer<>(props);
-        }
-
-        private void configure(Properties props) {
-            props.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
-            props.put(GROUP_ID_CONFIG, groupId);
-            props.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-            props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-            props.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
+            configs.put(GROUP_ID_CONFIG, groupId);
+            configs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+            
             if (Objects.equals(groupProtocol, CONSUMER.toString())) {
-                remoteAssignor.ifPresent(assignor -> 
props.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
+                remoteAssignor.ifPresent(assignor -> 
configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
             } else {
-                props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
assignmentStrategy);
+                configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
assignmentStrategy);
             }
+            configs.putAll(customConfigs);
+            consumer = new KafkaConsumer<>(configs);

Review Comment:
   Could you make sure all consumers get closed even though one of consumer 
gets error? We encountered resource leaks before and it makes our CI unstable. 
For example:
   ```java
       private static AutoCloseable run(
               String brokerAddress,
               int numberOfConsumers,
               String groupId,
               String groupProtocol,
               String topic,
               String assignmentStrategy,
               Optional<String> remoteAssignor,
               Map<String, Object> customConfigs,
               boolean syncCommit
       ) {
           Queue<Consumer<byte[], byte[]>> consumers = 
consumers(IntStream.range(0, numberOfConsumers).mapToObj(ignored -> {
               Map<String, Object> configs = new HashMap<>();
               configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
               configs.put(GROUP_ID_CONFIG, groupId);
               configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
               if (Objects.equals(groupProtocol, CONSUMER.toString())) {
                   remoteAssignor.ifPresent(assignor -> 
configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
               } else {
                   configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
assignmentStrategy);
               }
               configs.putAll(customConfigs);
               return configs;
           }).collect(Collectors.toList()));
   
           AtomicBoolean closed = new AtomicBoolean(false);
           ExecutorService service = 
Executors.newFixedThreadPool(consumers.size());
           final AutoCloseable closeable = () -> {
               closed.set(true);
               consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
               service.shutdownNow();
               service.awaitTermination(1, TimeUnit.MINUTES);
           };
           try {
               while (!consumers.isEmpty()) {
                   Consumer<byte[], byte[]> consumer = consumers.poll();
                   service.submit(() -> {
                       try {
                           consumer.subscribe(singleton(topic));
                           while (closed.get()) {
                               consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                               if (syncCommit)
                                   consumer.commitSync();
                           }
                       } catch (WakeupException | InterruptException e) {
                           // OK
                       } finally {
                           consumer.close();
                       }
                   });
               }
               return closeable;
           } catch (Throwable e) {
               Utils.closeQuietly(closeable, "release consumer");
               throw e;
           }
       }
   
       private static Queue<Consumer<byte[], byte[]>> 
consumers(List<Map<String, Object>> allConfigs) {
   
           Queue<Consumer<byte[], byte[]>> consumers = new LinkedList<>();
           try {
               allConfigs.forEach(configs ->
                       consumers.add(new KafkaConsumer<>(configs, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())));
               return consumers;
           } catch (Throwable e) {
               consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
               throw e;
           }
       }
   ```



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##########
@@ -75,17 +76,17 @@ private ConsumerGroupExecutor(
         });
     }
 
-    public static ConsumerGroupExecutor buildConsumerGroup(String 
brokerAddress,
-                                                           int numConsumers,
-                                                           String groupId,
-                                                           String topic,
-                                                           String 
groupProtocol,
-                                                           Optional<String> 
remoteAssignor,
-                                                           
Optional<Properties> customConfigs,
-                                                           boolean syncCommit) 
{
+    static ConsumerGroupExecutor buildConsumerGroup(String brokerAddress,

Review Comment:
   Could you move those static methods up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to