Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-19 Thread via GitHub


lucasbru merged PR #15594:
URL: https://github.com/apache/kafka/pull/15594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1570933991


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   FYI, I did test by setting the group protocol override at a connector level 
and that seems to be working fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2064120764

   hey @lucasbru , I ran the following test suite 
   ```
   my_test_suite:
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_pause_and_resume_sink@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_file_source_and_sink@{"security_protocol":"PLAINTEXT","exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_bounce@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_transformations@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
   ```
   
   and here are the results: 
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-04-18--001
   run time: 18 minutes 5.491 seconds
   tests run:8
   passed:   7
   flaky:0
   failed:   1
   ignored:  0
   

   test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: PASS
   run time:   6 minutes 7.875 seconds
   

   test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: FAIL
   run time:   7 minutes 59.232 seconds
   
   
   InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
   Traceback (most recent call last):
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
   data = self.run_test()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
   return self.test_context.function(self.test)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
   consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
 File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 
97, in __init__
   BackgroundThreadService.__init__(self, context, num_nodes)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
   super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
 File 

Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2047030266

   @lucasbru , that test did pass. However, let me try again with the snippet 
you shared above and see if it works. Let me get back to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2046963651

   @vamossagar12 did the test you ran pass?
   
   Here is an example how I run parameterized tests using a test suite file:
   ```
   consumer_test:
   - 
tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}
   ```
   
   The change looks fine to me. If you want to be sure that the test set up 
works, you may want to run the parameter combinations and post the results 
here. However, if you have tested one parameter combination successfully, and 
you are confident that the general test setup is working, I am fine with 
merging it like this (please confirm).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-31 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2028815010

   Thanks @kirktrue . I ran a single test by passing a parameter 
   ```

TC_PATHS="tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector"
 _DUCKTAPE_OPTIONS='--parameters 
'\''{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'\'
 bash tests/docker/run_tests.sh | tee debug_logs.txt
   ```
   
   and it passes but when I build a yaml file like this 
   
   ```
   my_test_suite:
 - 
'connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'
   ```
   and run it like:
   ```
TC_PATHS="tests/kafkatest/tests/connect/test-suite.yml" bash 
tests/docker/run_tests.sh | tee debug_logs.txt 
   ```
   The yaml file is in the same location as `connect_distributed.py`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-28 Thread via GitHub


kirktrue commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2025765078

   @lucasbru—can you take a look at this system test change? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-27 Thread via GitHub


philipnee commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1541985614


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-26 Thread via GitHub


vamossagar12 commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1539389617


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   Any config that we want to override on the consumer level for a sink 
connector can be passed in by prefixing it with 
`consumer.override.` , the config name is `group.protocol` in this 
case. I haven't had a chance to test this yet with the 3.8 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1538356233


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   are you sure this is what group protocol config called in connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-25 Thread via GitHub


vamossagar12 opened a new pull request, #15594:
URL: https://github.com/apache/kafka/pull/15594

   Main changes:
   
   1. Parameterizes the tests to use new coordinator and pass in consumer group 
protocol. This would be applicable to sink connectors only.
   2. Enhances the sink connector creation code in system tests to accept a new 
optional parameter for consumer group protocol to be used.
   3. Sets the consumer group protocol via `consumer.override.` override config 
which gets passed to the consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org