Repository: kafka Updated Branches: refs/heads/trunk 9c34df151 -> 8b9b07e5d
MINOR: ensure original use of prop_file in verifiable producer This PR: https://github.com/apache/kafka/pull/958 fixed the use of prop_file in the situation when we have multiple producers (before, every producer will add to the config). However, it assumes that self.prop_file is initially "". This is correct for all existing tests, but it precludes us from extending verifiable producer and adding more properties to the producer config (same as console consumer). This is a small PR to change the behavior to the original, but also make verifiable producer use prop_file method to be consistent with console consumer. Also few more fixes to verifiable producer came up during the review: -- fixed each_produced_at_least() method -- more straightforward use of compression types granders please review. Author: Anna Povzner <[email protected]> Reviewers: Geoff Anderson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1192 from apovzner/fix_verifiable_producer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b9b07e5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b9b07e5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b9b07e5 Branch: refs/heads/trunk Commit: 8b9b07e5d6aed2552d1cdfba27b0211af39c691f Parents: 9c34df1 Author: Anna Povzner <[email protected]> Authored: Thu Apr 7 18:17:40 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 7 18:17:40 2016 -0700 ---------------------------------------------------------------------- tests/kafkatest/services/verifiable_producer.py | 34 +++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8b9b07e5/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index f2ea421..0096a34 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -52,12 +52,8 @@ class VerifiableProducer(BackgroundThreadService): num_nodes = 1 * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer will produce exactly same messages, and validation may miss missing messages. - :param compression_types: If None, all producers will not use compression; or a list of one or - more compression types (including "none"). Each producer will pick a compression type - from the list in round-robin fashion. Example: compression_types = ["none", "snappy"] and - num_nodes = 3, then producer 1 and 2 will not use compression, and producer 3 will use - compression type = snappy. If in this example, num_nodes is 1, then first (and only) - producer will not use compression. + :param compression_types: If None, all producers will not use compression; or a list of + compression types, one per producer (could be "none"). """ super(VerifiableProducer, self).__init__(context, num_nodes) @@ -67,30 +63,36 @@ class VerifiableProducer(BackgroundThreadService): self.throughput = throughput self.message_validator = message_validator self.compression_types = compression_types + if self.compression_types is not None: + assert len(self.compression_types) == num_nodes, "Specify one compression type per node" + + self.security_config = self.kafka.security_config.client_config() for node in self.nodes: node.version = version self.acked_values = [] self.not_acked_values = [] self.produced_count = {} - self.prop_file = "" + + def prop_file(self, node): + idx = self.idx(node) + prop_file = str(self.security_config) + if self.compression_types is not None: + compression_index = idx - 1 + self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx, + self.compression_types[compression_index]) + prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index] + return prop_file def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False) # Create and upload log properties - self.security_config = self.kafka.security_config.client_config(self.prop_file) - producer_prop_file = str(self.security_config) log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE) node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config) # Create and upload config file - if self.compression_types is not None: - compression_index = (idx - 1) % len(self.compression_types) - self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx, - self.compression_types[compression_index]) - producer_prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index] - + producer_prop_file = self.prop_file(node) self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file) @@ -197,7 +199,7 @@ class VerifiableProducer(BackgroundThreadService): def each_produced_at_least(self, count): with self.lock: - for idx in range(1, self.num_nodes): + for idx in range(1, self.num_nodes + 1): if self.produced_count.get(idx) is None or self.produced_count[idx] < count: return False return True
