imcdo commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1355442947


##########
tests/kafkatest/services/kafka/kafka.py:
##########
@@ -277,6 +278,9 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
         self.isolated_controller_quorum = None # will define below if necessary
         self.configured_for_zk_migration = False
 
+        if use_new_coordinator is None :
+            self.use_new_coordinator = 
self.context.globals.get["use_new_coordinator", False]

Review Comment:
   get is a method, use ()
   ```suggestion
               self.use_new_coordinator = 
self.context.globals.get("use_new_coordinator", False)
   ```



##########
tests/kafkatest/services/kafka/config.py:
##########
@@ -28,7 +28,8 @@ class KafkaConfig(dict):
         config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB
         config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: 
str(10*1024*1024), # 10 MB
         config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 
MB
-        config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute
+        config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute
+        config_property.NEW_GROUP_COORDINATOR_ENABLE: False

Review Comment:
   Nit: is there a better name than new group coordinator?  Not very 
descriptive and could easily become the old group coordinator if we arn't 
careful.



##########
tests/kafkatest/services/kafka/kafka.py:
##########
@@ -407,6 +411,7 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
             kraft_broker_configs = {
                 config_property.PORT: config_property.FIRST_BROKER_PORT,
                 config_property.NODE_ID: self.idx(node),
+                config_property.NEW_GROUP_COORDINATOR_ENABLE: 
use_new_coordinator
             }

Review Comment:
   dont we want to conditionally add this when `use_new_coordinator` is true?
   ```suggestion
               kraft_broker_configs = {
                   config_property.PORT: config_property.FIRST_BROKER_PORT,
                   config_property.NODE_ID: self.idx(node)
               }
               if self.use_new_coordinator:
                   
kraft_broker_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 
use_new_coordinator
               }
   ```
   that way we stick with the default value?  Or do we care?  Either was is 
fine.



##########
tests/kafkatest/tests/core/consume_bench_test.py:
##########
@@ -91,11 +99,17 @@ def test_consume_bench(self, topics, 
metadata_quorum=quorum.zk):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_single_partition(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=quorum.all_non_upgrade,
+        use_new_coordinator=[True, False]
+    )

Review Comment:
   same applies here:
   ```suggestion
       @matrix(
           metadata_quorum=[quorum.isolated_kraft],
           use_new_coordinator=[True, False]
       )
       @matrix(
           metadata_quorum=[quorum.zk],
           use_new_coordinator=[False]
       )
   ```
   
   There are some alternitives, one could be your 
`skip_if_new_coordinator_and_zk` wrapper could go into the method metadata, and 
remove the testcases with use_new_coordinator=True and metadata_quorum=zk, 
though id advise against it as its not that simple.  



##########
tests/kafkatest/services/kafka/kafka.py:
##########
@@ -277,6 +278,9 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
         self.isolated_controller_quorum = None # will define below if necessary

Review Comment:
   make sure you add a description of the param you are adding to the docstring 
for the __init__ method



##########
tests/kafkatest/tests/core/consume_bench_test.py:
##########
@@ -68,12 +67,21 @@ def produce_messages(self, topics, max_messages=10000):
         self.logger.debug("Produce workload finished")
 
     @cluster(num_nodes=10)
-    @matrix(topics=[["consume_bench_topic[0-5]"]], 
metadata_quorum=quorum.all_non_upgrade) # topic subscription
-    @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], 
metadata_quorum=quorum.all_non_upgrade)  # manual topic assignment
-    def test_consume_bench(self, topics, metadata_quorum=quorum.zk):
+    @matrix(
+        topics=[
+            ["consume_bench_topic[0-5]"],
+            ["consume_bench_topic[0-5]:[0-4]"]
+        ],
+        metadata_quorum=quorum.all_non_upgrade,
+        use_new_coordinator=[True, False]
+    ) # topic subscription

Review Comment:
   You probably shouldn't skip zk + new coordinator test, just dont run it in 
the first place.  We dont really want to polute the report with a bunch of 
skips (which are normally reserved for broken test that we intend to fix later 
historicly).
   
   ```suggestion
       @matrix(
           topics=[
               ["consume_bench_topic[0-5]"],
               ["consume_bench_topic[0-5]:[0-4]"]
           ],
           metadata_quorum=metadata_quorum=[quorum.isolated_kraft],
           use_new_coordinator=[True, False]
       )
       @matrix(
           topics=[
               ["consume_bench_topic[0-5]"],
               ["consume_bench_topic[0-5]:[0-4]"]
           ],
           metadata_quorum=[quorum.zk],
           use_new_coordinator=[False]
       )  # topic subscription
   ```
   as annoying as that is, prob for the best.



##########
tests/kafkatest/services/kafka/kafka.py:
##########
@@ -764,6 +769,9 @@ def prop_file(self, node):
             else:
                 override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] 
= 'false'
 
+        if self.use_new_coordinator:
+            override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 
'true'

Review Comment:
   should we limit this to only kraft brokers instead of all brokers or does it 
not matter?



##########
tests/kafkatest/tests/core/consume_bench_test.py:
##########
@@ -188,12 +220,18 @@ def test_multiple_consumers_random_group_partitions(self, 
metadata_quorum=quorum
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_multiple_consumers_specified_group_partitions_should_raise(self, 
metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=quorum.all_non_upgrade,
+        use_new_coordinator=[True, False]
+    )
+    @skip_if_new_coordinator_and_zk
+    def test_multiple_consumers_specified_group_partitions_should_raise(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Runs multiple consumers in the same group to read messages from 
specific partitions.
         It is an invalid configuration to provide a consumer group and 
specific partitions.
         """
+        self.kafka.use_new_coordinator = use_new_coordinator

Review Comment:
   make sure in your testing this actually works by sshing onto all the brokers 
to see if its set properly.  The way its configured now, at `kafka.py` 
construction, the kraft brokers configs sets this to False, then at runtime, 
the override sets it to `use_new_coordinator` so it should work. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to