Repository: kafka Updated Branches: refs/heads/trunk 527b98d82 -> 81f76bde8
KAFKA-3520: Add system tests for REST APIs of list connector plugins and config validation ewen granders Ready for review. Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1195 from Ishiihara/system-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f76bde Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f76bde Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f76bde Branch: refs/heads/trunk Commit: 81f76bde8565eaffd67e5adaa69ddfdb4f5cebaa Parents: 527b98d Author: Liquan Pei <[email protected]> Authored: Thu May 12 18:19:00 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu May 12 18:19:00 2016 -0700 ---------------------------------------------------------------------- tests/kafkatest/services/connect.py | 9 ++++- .../tests/connect/connect_distributed_test.py | 3 ++ .../tests/connect/connect_rest_test.py | 42 +++++++++++++++----- tests/kafkatest/tests/connect/connect_test.py | 3 ++ .../templates/connect-file-sink.properties | 2 +- .../templates/connect-file-source.properties | 2 +- 6 files changed, 49 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/services/connect.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index aad9ff3..5371a72 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -101,7 +101,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def clean_node(self, node): node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) self.security_config.clean_node(node) - node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False) + all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files) + node.account.ssh("rm -rf " + all_files, allow_fail=False) def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] @@ -140,6 +141,12 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def resume_connector(self, name, node=None): return self._rest('/connectors/' + name + '/resume', method="PUT") + def list_connector_plugins(self, node=None): + return self._rest('/connector-plugins/', node=node) + + def validate_config(self, connector_type, validate_request, node=None): + return self._rest('/connector-plugins/' + connector_type + '/config/validate', validate_request, node=node, method="PUT") + def _rest(self, path, body=None, node=None, method="GET"): if node is None: node = random.choice(self.nodes) http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index d3ae2e1..a4d68f3 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -32,6 +32,9 @@ class ConnectDistributedTest(Test): another, validating the total output is identical to the input. """ + FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector' + FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector' + INPUT_FILE = "/mnt/connect.input" OUTPUT_FILE = "/mnt/connect.output" http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_rest_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 63b9bb1..c32b8e1 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -15,7 +15,6 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.connect import ConnectDistributedService, ConnectRestError -from kafkatest.utils.util import retry_on_exception from ducktape.utils.util import wait_until import subprocess import json @@ -27,6 +26,12 @@ class ConnectRestApiTest(KafkaTest): Test of Kafka Connect's REST API endpoints. """ + FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector' + FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector' + + FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'} + FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'} + INPUT_FILE = "/mnt/connect.input" INPUT_FILE2 = "/mnt/connect.input2" OUTPUT_FILE = "/mnt/connect.output" @@ -43,11 +48,11 @@ class ConnectRestApiTest(KafkaTest): LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"] LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n" - SCHEMA = { "type": "string", "optional": False } + SCHEMA = {"type": "string", "optional": False} def __init__(self, test_context): super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ - 'test' : { 'partitions': 1, 'replication-factor': 1 } + 'test': {'partitions': 1, 'replication-factor': 1} }) self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE]) @@ -64,12 +69,23 @@ class ConnectRestApiTest(KafkaTest): assert self.cc.list_connectors() == [] - self.logger.info("Creating connectors") + assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == {self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} + source_connector_props = self.render("connect-file-source.properties") sink_connector_props = self.render("connect-file-sink.properties") - for connector_props in [source_connector_props, sink_connector_props]: - connector_config = self._config_dict_from_props(connector_props) - self.cc.create_connector(connector_config, retries=120, retry_backoff=1) + + self.logger.info("Validating connector configurations") + source_connector_config = self._config_dict_from_props(source_connector_props) + configs = self.cc.validate_config(self.FILE_SOURCE_CONNECTOR, source_connector_config) + self.verify_config(self.FILE_SOURCE_CONNECTOR, self.FILE_SOURCE_CONFIGS, configs) + + sink_connector_config = self._config_dict_from_props(sink_connector_props) + configs = self.cc.validate_config(self.FILE_SINK_CONNECTOR, sink_connector_config) + self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs) + + self.logger.info("Creating connectors") + self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1) + self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1) # We should see the connectors appear wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]), @@ -91,7 +107,7 @@ class ConnectRestApiTest(KafkaTest): expected_source_info = { 'name': 'local-file-source', 'config': self._config_dict_from_props(source_connector_props), - 'tasks': [{ 'connector': 'local-file-source', 'task': 0 }] + 'tasks': [{'connector': 'local-file-source', 'task': 0}] } source_info = self.cc.get_connector("local-file-source") assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info) @@ -100,7 +116,7 @@ class ConnectRestApiTest(KafkaTest): expected_sink_info = { 'name': 'local-file-sink', 'config': self._config_dict_from_props(sink_connector_props), - 'tasks': [{'connector': 'local-file-sink', 'task': 0 }] + 'tasks': [{'connector': 'local-file-sink', 'task': 0}] } sink_info = self.cc.get_connector("local-file-sink") assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info) @@ -164,3 +180,11 @@ class ConnectRestApiTest(KafkaTest): def _config_dict_from_props(self, connector_props): return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) + def verify_config(self, name, config_def, configs): + # Should have zero errors + assert name == configs['name'] + # Should have zero errors + assert 0 == configs['error_count'] + # Should return all configuration + config_names = [config['definition']['name'] for config in configs['configs']] + assert config_def == set(config_names) http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 7b57402..9184390 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -31,6 +31,9 @@ class ConnectStandaloneFileTest(Test): identical to the input. """ + FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector' + FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector' + INPUT_FILE = "/mnt/connect.input" OUTPUT_FILE = "/mnt/connect.output" http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-sink.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties index ad78bb3..216dab5 100644 --- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties +++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-sink -connector.class=FileStreamSink +connector.class={{ FILE_SINK_CONNECTOR }} tasks.max=1 file={{ OUTPUT_FILE }} topics={{ TOPIC }} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-source.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties index d2d5e97..bff9720 100644 --- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties +++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-source -connector.class=FileStreamSource +connector.class={{ FILE_SOURCE_CONNECTOR }} tasks.max=1 file={{ INPUT_FILE }} topic={{ TOPIC }} \ No newline at end of file
