Repository: kafka Updated Branches: refs/heads/trunk d1bb2b9df -> dbafc631a
KAFKA-3673: Connect tests don't handle concurrent config changes Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1340 from Ishiihara/connect-test-failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbafc631 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbafc631 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbafc631 Branch: refs/heads/trunk Commit: dbafc631ad78c96f85361a3d5e1c4d203cedb26f Parents: d1bb2b9 Author: Liquan Pei <[email protected]> Authored: Sun May 8 23:50:43 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Sun May 8 23:50:43 2016 -0700 ---------------------------------------------------------------------- tests/kafkatest/services/connect.py | 38 ++++++++++---------- .../tests/connect/connect_rest_test.py | 25 ++++++------- tests/kafkatest/utils/util.py | 12 +++++++ 3 files changed, 45 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dbafc631/tests/kafkatest/services/connect.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 1eb2dd5..cf67c30 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -22,6 +22,7 @@ import requests from ducktape.errors import DucktapeError from ducktape.services.service import Service from ducktape.utils.util import wait_until +from kafkatest.utils.util import retry_on_exception from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -102,31 +103,30 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] + def list_connectors(self, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff) - def list_connectors(self, node=None): - return self._rest('/connectors', node=node) - - def create_connector(self, config, node=None): + def create_connector(self, config, node=None, retries=0, retry_backoff=.01): create_request = { 'name': config['name'], 'config': config } - return self._rest('/connectors', create_request, node=node, method="POST") + return self._rest_with_retry('/connectors', create_request, node=node, method="POST", retries=retries, retry_backoff=retry_backoff) - def get_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node) + def get_connector(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff) - def get_connector_config(self, name, node=None): - return self._rest('/connectors/' + name + '/config', node=node) + def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries, retry_backoff=retry_backoff) - def set_connector_config(self, name, config, node=None): - return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT") + def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", retries=retries, retry_backoff=retry_backoff) - def get_connector_tasks(self, name, node=None): - return self._rest('/connectors/' + name + '/tasks', node=node) + def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries, retry_backoff=retry_backoff) - def delete_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node, method="DELETE") + def delete_connector(self, name, node=None, retries=0, retry_backoff=.01): + return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff) def _rest(self, path, body=None, node=None, method="GET"): if node is None: @@ -144,10 +144,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): else: return resp.json() + def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01): + return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError, retries, retry_backoff) def _base_url(self, node): return 'http://' + node.account.externally_routable_ip + ':' + '8083' + class ConnectStandaloneService(ConnectServiceBase): """Runs Kafka Connect in standalone mode.""" @@ -223,8 +226,6 @@ class ConnectDistributedService(ConnectServiceBase): raise RuntimeError("No process ids recorded") - - class ConnectRestError(RuntimeError): def __init__(self, status, msg, url): self.status = status @@ -235,7 +236,6 @@ class ConnectRestError(RuntimeError): return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message - class VerifiableConnector(object): def messages(self): """ @@ -261,6 +261,7 @@ class VerifiableConnector(object): self.logger.info("Destroying connector %s %s", type(self).__name__, self.name) self.cc.delete_connector(self.name) + class VerifiableSource(VerifiableConnector): """ Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output. @@ -284,6 +285,7 @@ class VerifiableSource(VerifiableConnector): 'throughput': self.throughput }) + class VerifiableSink(VerifiableConnector): """ Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output. http://git-wip-us.apache.org/repos/asf/kafka/blob/dbafc631/tests/kafkatest/tests/connect/connect_rest_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 69a8cb7..63b9bb1 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -15,8 +15,12 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.connect import ConnectDistributedService, ConnectRestError +from kafkatest.utils.util import retry_on_exception from ducktape.utils.util import wait_until -import hashlib, subprocess, json, itertools +import subprocess +import json +import itertools + class ConnectRestApiTest(KafkaTest): """ @@ -65,10 +69,10 @@ class ConnectRestApiTest(KafkaTest): sink_connector_props = self.render("connect-file-sink.properties") for connector_props in [source_connector_props, sink_connector_props]: connector_config = self._config_dict_from_props(connector_props) - self.cc.create_connector(connector_config) + self.cc.create_connector(connector_config, retries=120, retry_backoff=1) # We should see the connectors appear - wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), + wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]), timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing") # We'll only do very simple validation that the connectors and tasks really ran. @@ -76,7 +80,6 @@ class ConnectRestApiTest(KafkaTest): node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE) wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - # Trying to create the same connector again should cause an error try: self.cc.create_connector(self._config_dict_from_props(source_connector_props)) @@ -97,19 +100,18 @@ class ConnectRestApiTest(KafkaTest): expected_sink_info = { 'name': 'local-file-sink', 'config': self._config_dict_from_props(sink_connector_props), - 'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }] + 'tasks': [{'connector': 'local-file-sink', 'task': 0 }] } sink_info = self.cc.get_connector("local-file-sink") assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info) sink_config = self.cc.get_connector_config("local-file-sink") assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config) - # Validate that we can get info about tasks. This info should definitely be available now without waiting since # we've already seen data appear in files. # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors expected_source_task_info = [{ - 'id': { 'connector': 'local-file-source', 'task': 0 }, + 'id': {'connector': 'local-file-source', 'task': 0}, 'config': { 'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask', 'file': self.INPUT_FILE, @@ -119,7 +121,7 @@ class ConnectRestApiTest(KafkaTest): source_task_info = self.cc.get_connector_tasks("local-file-source") assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info) expected_sink_task_info = [{ - 'id': { 'connector': 'local-file-sink', 'task': 0 }, + 'id': {'connector': 'local-file-sink', 'task': 0}, 'config': { 'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask', 'file': self.OUTPUT_FILE, @@ -139,9 +141,9 @@ class ConnectRestApiTest(KafkaTest): node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - self.cc.delete_connector("local-file-source") - self.cc.delete_connector("local-file-sink") - wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") + self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1) + self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1) + wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") def validate_output(self, input): input_set = set(input) @@ -151,7 +153,6 @@ class ConnectRestApiTest(KafkaTest): ])) return input_set == output_set - def file_contents(self, node, file): try: # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of http://git-wip-us.apache.org/repos/asf/kafka/blob/dbafc631/tests/kafkatest/utils/util.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index 0b10dbf..c043bec 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -15,6 +15,7 @@ from kafkatest import __version__ as __kafkatest_version__ import re +import time def kafkatest_version(): @@ -71,3 +72,14 @@ def is_int_with_prefix(msg): raise Exception("Unexpected message format. Message should be of format: integer " "prefix dot integer value, but one of the two parts (before or after dot) " "are not integers. Message: %s" % (msg)) + + +def retry_on_exception(fun, exception, retries, retry_backoff=.01): + exception_to_throw = None + for i in range(0, retries + 1): + try: + return fun() + except exception as e: + exception_to_throw = e + time.sleep(retry_backoff) + raise exception_to_throw
