Repository: kafka
Updated Branches:
  refs/heads/trunk 2ca9177f4 -> 0c8c167e8


MINOR: Allow for asynchronous start of producer consumer in validation test

Author: Konstantine Karantasis <konstant...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1909 from kkonstantine/MINOR-Async-start-in-produce-consume-validate


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c8c167e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c8c167e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c8c167e

Branch: refs/heads/trunk
Commit: 0c8c167e844e53f64df80fca52b9b9b1d3dba8b7
Parents: 2ca9177
Author: Konstantine Karantasis <konstant...@confluent.io>
Authored: Thu Sep 29 13:39:01 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 29 13:50:39 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/services/kafka/templates/kafka.properties  |  4 +++-
 .../services/templates/console_consumer.properties       |  3 +++
 tests/kafkatest/tests/produce_consume_validate.py        | 11 ++++++-----
 3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 06ec603..7f3a920 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -60,4 +60,6 @@ zookeeper.session.timeout.ms={{ zk_session_timeout }}
 replica.lag.time.max.ms={{replica_lag}}
 {% endif %}
 
-
+{% if auto_create_topics_enable is defined and auto_create_topics_enable is 
not none %}
+auto.create.topics.enable={{ auto_create_topics_enable }}
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties 
b/tests/kafkatest/services/templates/console_consumer.properties
index 4bac01f..40ed2f3 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -19,3 +19,6 @@ group.id={{ group_id|default('test-consumer-group') }}
 client.id={{ client_id }}
 {% endif %}
 
+{% if consumer_metadata_max_age_ms is defined and consumer_metadata_max_age_ms 
is not none %}
+metadata.max.age.ms={{ consumer_metadata_max_age_ms }}
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8c167e/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py 
b/tests/kafkatest/tests/produce_consume_validate.py
index a5da7be..be7cda4 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -32,13 +32,13 @@ class ProduceConsumeValidateTest(Test):
     def setup_producer_and_consumer(self):
         raise NotImplementedError("Subclasses should implement this")
 
-    def start_producer_and_consumer(self):
+    def start_producer_and_consumer(self, async=False):
         # Start background producer and consumer
         self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20,
+        wait_until(lambda: async or self.producer.num_acked > 5, 
timeout_sec=20,
              err_msg="Producer failed to start in a reasonable amount of 
time.")
         self.consumer.start()
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, 
timeout_sec=60,
+        wait_until(lambda: async or len(self.consumer.messages_consumed[1]) > 
0, timeout_sec=60,
              err_msg="Consumer failed to start in a reasonable amount of 
time.")
 
     def check_alive(self):
@@ -63,10 +63,10 @@ class ProduceConsumeValidateTest(Test):
         self.producer.stop()
         self.consumer.wait()
 
-    def run_produce_consume_validate(self, core_test_action=None, *args):
+    def run_produce_consume_validate(self, async=False, core_test_action=None, 
*args):
         """Top-level template for simple produce/consume/validate tests."""
         try:
-            self.start_producer_and_consumer()
+            self.start_producer_and_consumer(async)
 
             if core_test_action is not None:
                 core_test_action(*args)
@@ -108,6 +108,7 @@ class ProduceConsumeValidateTest(Test):
         msg = ""
         acked = self.producer.acked
         consumed = self.consumer.messages_consumed[1]
+        # Correctness of the set difference operation depends on using 
equivalent message_validators in procuder and consumer
         missing = set(acked) - set(consumed)
 
         self.logger.info("num consumed:  %d" % len(consumed))

Reply via email to