Repository: kafka
Updated Branches:
  refs/heads/trunk 9c34df151 -> 8b9b07e5d


MINOR: ensure original use of prop_file in verifiable producer

This PR: https://github.com/apache/kafka/pull/958 fixed the use of prop_file in 
the situation when we have multiple producers (before, every producer will add 
to the config). However, it assumes that self.prop_file is initially "". This 
is correct for all existing tests, but it precludes us from extending 
verifiable producer and adding more properties to the producer config (same as 
console consumer).

This is a small PR to change the behavior to the original, but also make 
verifiable producer use prop_file method to be consistent with console consumer.

Also few more fixes to verifiable producer came up during the review:
-- fixed each_produced_at_least() method
-- more straightforward use of compression types

granders please review.

Author: Anna Povzner <[email protected]>

Reviewers: Geoff Anderson <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1192 from apovzner/fix_verifiable_producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b9b07e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b9b07e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b9b07e5

Branch: refs/heads/trunk
Commit: 8b9b07e5d6aed2552d1cdfba27b0211af39c691f
Parents: 9c34df1
Author: Anna Povzner <[email protected]>
Authored: Thu Apr 7 18:17:40 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Thu Apr 7 18:17:40 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_producer.py | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b9b07e5/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index f2ea421..0096a34 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -52,12 +52,8 @@ class VerifiableProducer(BackgroundThreadService):
                num_nodes = 1
                * is_int_with_prefix recommended if num_nodes > 1, because 
otherwise each producer
                will produce exactly same messages, and validation may miss 
missing messages.
-        :param compression_types: If None, all producers will not use 
compression; or a list of one or
-        more compression types (including "none"). Each producer will pick a 
compression type
-        from the list in round-robin fashion. Example: compression_types = 
["none", "snappy"] and
-        num_nodes = 3, then producer 1 and 2 will not use compression, and 
producer 3 will use
-        compression type = snappy. If in this example, num_nodes is 1, then 
first (and only)
-        producer will not use compression.
+        :param compression_types: If None, all producers will not use 
compression; or a list of
+        compression types, one per producer (could be "none").
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
 
@@ -67,30 +63,36 @@ class VerifiableProducer(BackgroundThreadService):
         self.throughput = throughput
         self.message_validator = message_validator
         self.compression_types = compression_types
+        if self.compression_types is not None:
+            assert len(self.compression_types) == num_nodes, "Specify one 
compression type per node"
+
+        self.security_config = self.kafka.security_config.client_config()
 
         for node in self.nodes:
             node.version = version
         self.acked_values = []
         self.not_acked_values = []
         self.produced_count = {}
-        self.prop_file = ""
+
+    def prop_file(self, node):
+        idx = self.idx(node)
+        prop_file = str(self.security_config)
+        if self.compression_types is not None:
+            compression_index = idx - 1
+            self.logger.info("VerifiableProducer (index = %d) will use 
compression type = %s", idx,
+                             self.compression_types[compression_index])
+            prop_file += "\ncompression.type=%s\n" % 
self.compression_types[compression_index]
+        return prop_file
 
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, 
allow_fail=False)
 
         # Create and upload log properties
-        self.security_config = 
self.kafka.security_config.client_config(self.prop_file)
-        producer_prop_file = str(self.security_config)
         log_config = self.render('tools_log4j.properties', 
log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 
         # Create and upload config file
-        if self.compression_types is not None:
-            compression_index = (idx - 1) % len(self.compression_types)
-            self.logger.info("VerifiableProducer (index = %d) will use 
compression type = %s", idx,
-                             self.compression_types[compression_index])
-            producer_prop_file += "\ncompression.type=%s\n" % 
self.compression_types[compression_index]
-
+        producer_prop_file = self.prop_file(node)
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, 
producer_prop_file)
@@ -197,7 +199,7 @@ class VerifiableProducer(BackgroundThreadService):
 
     def each_produced_at_least(self, count):
         with self.lock:
-            for idx in range(1, self.num_nodes):
+            for idx in range(1, self.num_nodes + 1):
                 if self.produced_count.get(idx) is None or 
self.produced_count[idx] < count:
                     return False
             return True

Reply via email to