Repository: kafka Updated Branches: refs/heads/0.10.1 18e05117a -> 472b3a558
MINOR: Allow for asynchronous start of producer consumer in validation test Author: Konstantine Karantasis <konstant...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #1909 from kkonstantine/MINOR-Async-start-in-produce-consume-validate (cherry picked from commit 0c8c167e844e53f64df80fca52b9b9b1d3dba8b7) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/472b3a55 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/472b3a55 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/472b3a55 Branch: refs/heads/0.10.1 Commit: 472b3a5584c3486e2f351ff330cd4e319d3f144f Parents: 18e0511 Author: Konstantine Karantasis <konstant...@confluent.io> Authored: Thu Sep 29 13:39:01 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 29 13:53:04 2016 -0700 ---------------------------------------------------------------------- .../kafkatest/services/kafka/templates/kafka.properties | 4 +++- .../services/templates/console_consumer.properties | 3 +++ tests/kafkatest/tests/produce_consume_validate.py | 11 ++++++----- 3 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/services/kafka/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 06ec603..7f3a920 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -60,4 +60,6 @@ zookeeper.session.timeout.ms={{ zk_session_timeout }} replica.lag.time.max.ms={{replica_lag}} {% endif %} - +{% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %} +auto.create.topics.enable={{ auto_create_topics_enable }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/services/templates/console_consumer.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties index 4bac01f..40ed2f3 100644 --- a/tests/kafkatest/services/templates/console_consumer.properties +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -19,3 +19,6 @@ group.id={{ group_id|default('test-consumer-group') }} client.id={{ client_id }} {% endif %} +{% if consumer_metadata_max_age_ms is defined and consumer_metadata_max_age_ms is not none %} +metadata.max.age.ms={{ consumer_metadata_max_age_ms }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/472b3a55/tests/kafkatest/tests/produce_consume_validate.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index a5da7be..be7cda4 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -32,13 +32,13 @@ class ProduceConsumeValidateTest(Test): def setup_producer_and_consumer(self): raise NotImplementedError("Subclasses should implement this") - def start_producer_and_consumer(self): + def start_producer_and_consumer(self, async=False): # Start background producer and consumer self.producer.start() - wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20, + wait_until(lambda: async or self.producer.num_acked > 5, timeout_sec=20, err_msg="Producer failed to start in a reasonable amount of time.") self.consumer.start() - wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, + wait_until(lambda: async or len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, err_msg="Consumer failed to start in a reasonable amount of time.") def check_alive(self): @@ -63,10 +63,10 @@ class ProduceConsumeValidateTest(Test): self.producer.stop() self.consumer.wait() - def run_produce_consume_validate(self, core_test_action=None, *args): + def run_produce_consume_validate(self, async=False, core_test_action=None, *args): """Top-level template for simple produce/consume/validate tests.""" try: - self.start_producer_and_consumer() + self.start_producer_and_consumer(async) if core_test_action is not None: core_test_action(*args) @@ -108,6 +108,7 @@ class ProduceConsumeValidateTest(Test): msg = "" acked = self.producer.acked consumed = self.consumer.messages_consumed[1] + # Correctness of the set difference operation depends on using equivalent message_validators in procuder and consumer missing = set(acked) - set(consumed) self.logger.info("num consumed: %d" % len(consumed))