Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
lucasbru merged PR #15594: URL: https://github.com/apache/kafka/pull/15594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1570933991 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: FYI, I did test by setting the group protocol override at a connector level and that seems to be working fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2064120764 hey @lucasbru , I ran the following test suite ``` my_test_suite: - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_pause_and_resume_sink@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_file_source_and_sink@{"security_protocol":"PLAINTEXT","exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_bounce@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_transformations@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} ``` and here are the results: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-04-18--001 run time: 18 minutes 5.491 seconds tests run:8 passed: 7 flaky:0 failed: 1 ignored: 0 test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: PASS run time: 6 minutes 7.875 seconds test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2047030266 @lucasbru , that test did pass. However, let me try again with the snippet you shared above and see if it works. Let me get back to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
lucasbru commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2046963651 @vamossagar12 did the test you ran pass? Here is an example how I run parameterized tests using a test suite file: ``` consumer_test: - tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"} ``` The change looks fine to me. If you want to be sure that the test set up works, you may want to run the parameter combinations and post the results here. However, if you have tested one parameter combination successfully, and you are confident that the general test setup is working, I am fine with merging it like this (please confirm). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2028815010 Thanks @kirktrue . I ran a single test by passing a parameter ``` TC_PATHS="tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector" _DUCKTAPE_OPTIONS='--parameters '\''{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'\' bash tests/docker/run_tests.sh | tee debug_logs.txt ``` and it passes but when I build a yaml file like this ``` my_test_suite: - 'connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}' ``` and run it like: ``` TC_PATHS="tests/kafkatest/tests/connect/test-suite.yml" bash tests/docker/run_tests.sh | tee debug_logs.txt ``` The yaml file is in the same location as `connect_distributed.py`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
kirktrue commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2025765078 @lucasbru—can you take a look at this system test change? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
philipnee commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1541985614 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: thanks for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1539389617 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: Any config that we want to override on the consumer level for a sink connector can be passed in by prefixing it with `consumer.override.` , the config name is `group.protocol` in this case. I haven't had a chance to test this yet with the 3.8 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
philipnee commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1538356233 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: are you sure this is what group protocol config called in connect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org