Repository: kafka Updated Branches: refs/heads/trunk 64930cd71 -> 90b5ce3f0
KAFKA-6016; Make the reassign partitions system test use the idempotent producer With these changes, we are ensuring that the partitions being reassigned are from non-zero offsets. We also ensure that every message in the log has producerId and sequence number. This means that it successfully reproduces https://issues.apache.org/jira/browse/KAFKA-6003. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #4029 from apurvam/KAFKA-6016-add-idempotent-producer-to-reassign-partitions Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/90b5ce3f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/90b5ce3f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/90b5ce3f Branch: refs/heads/trunk Commit: 90b5ce3f04626ee24aebd1d06588489180a4bb05 Parents: 64930cd Author: Apurva Mehta <apu...@confluent.io> Authored: Tue Oct 10 10:31:52 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 10 10:32:17 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/ProducerRecord.java | 8 +- .../kafkatest/services/kafka/config_property.py | 1 + .../services/kafka/templates/kafka.properties | 1 - tests/kafkatest/services/verifiable_producer.py | 10 ++- .../tests/core/reassign_partitions_test.py | 91 +++++++++++++++----- .../apache/kafka/tools/VerifiableProducer.java | 30 ++++++- 6 files changed, 109 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 85428e5..c8ff00b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -58,7 +58,8 @@ public class ProducerRecord<K, V> { * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent - * @param timestamp The timestamp of the record + * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign + * the timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents * @param headers the headers that will be included in the record @@ -85,7 +86,8 @@ public class ProducerRecord<K, V> { * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent - * @param timestamp The timestamp of the record + * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the + * timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents */ @@ -168,7 +170,7 @@ public class ProducerRecord<K, V> { } /** - * @return The timestamp + * @return The timestamp, which is in milliseconds since epoch. */ public Long timestamp() { return timestamp; http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/kafka/config_property.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 8b64d0e..b261960 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -33,6 +33,7 @@ NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir" LOG_RETENTION_HOURS = "log.retention.hours" LOG_SEGMENT_BYTES = "log.segment.bytes" LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms" +LOG_RETENTION_MS = "log.retention.ms" LOG_CLEANER_ENABLE = "log.cleaner.enable" AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable" http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/kafka/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index eca5bf6..8cca14f 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -30,7 +30,6 @@ num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 -log.retention.check.interval.ms=300000 log.cleaner.enable=false security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 6ba3e86..17f1ec3 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -56,7 +56,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, - stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[]): + stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", + enable_idempotence=False, offline_nodes=[], create_time=-1): """ :param max_messages is a number of messages to be produced per producer :param message_validator checks for an expected format of messages produced. There are @@ -91,6 +92,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.request_timeout_sec = request_timeout_sec self.enable_idempotence = enable_idempotence self.offline_nodes = offline_nodes + self.create_time = create_time def java_class_name(self): return "VerifiableProducer" @@ -125,8 +127,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000) if self.enable_idempotence: self.logger.info("Setting up an idempotent producer") - producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n" - producer_prop_file += "\nretries=50\n" + producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n" + producer_prop_file += "\nretries=1000000\n" producer_prop_file += "\nenable.idempotence=true\n" self.logger.info("verifiable_producer.properties:") @@ -194,6 +196,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou cmd += " --value-prefix %s" % str(idx) if self.acks is not None: cmd += " --acks %s " % str(self.acks) + if self.create_time > -1: + cmd += " --message-create-time %s " % str(self.create_time) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/tests/core/reassign_partitions_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index fef57d1..5abde72 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until +from kafkatest.services.kafka import config_property from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer @@ -24,7 +25,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int import random - +import time class ReassignPartitionsTest(ProduceConsumeValidateTest): """ @@ -38,13 +39,25 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest): super(ReassignPartitionsTest, self).__init__(test_context=test_context) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: { - "partitions": 20, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}} - }) self.num_partitions = 20 + self.zk = ZookeeperService(test_context, num_nodes=1) + # We set the min.insync.replicas to match the replication factor because + # it makes the test more stringent. If min.isr = 2 and + # replication.factor=3, then the test would tolerate the failure of + # reassignment for upto one replica per partition, which is not + # desirable for this test in particular. + self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, + server_prop_overides=[ + [config_property.LOG_ROLL_TIME_MS, "5000"], + [config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"] + ], + topics={self.topic: { + "partitions": self.num_partitions, + "replication-factor": 3, + 'configs': { + "min.insync.replicas": 3, + }} + }) self.timeout_sec = 60 self.producer_throughput = 1000 self.num_producers = 1 @@ -86,14 +99,42 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest): self.clean_bounce_some_brokers() # Wait until finished or timeout - wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5) + wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), + timeout_sec=self.timeout_sec, backoff_sec=.5) - @cluster(num_nodes=7) - @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True) - @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False) - def test_reassign_partitions(self, bounce_brokers, security_protocol): + def move_start_offset(self): + """We move the start offset of the topic by writing really old messages + and waiting for them to be cleaned up. + """ + producer = VerifiableProducer(self.test_context, 1, self.kafka, self.topic, + throughput=-1, enable_idempotence=True, + create_time=1000) + producer.start() + wait_until(lambda: producer.num_acked > 0, + timeout_sec=30, + err_msg="Failed to get an acknowledgement for %ds" % 30) + # Wait 8 seconds to let the topic be seeded with messages that will + # be deleted. The 8 seconds is important, since we should get 2 deleted + # segments in this period based on the configured log roll time and the + # retention check interval. + time.sleep(8) + producer.stop() + self.logger.info("Seeded topic with %d messages which will be deleted" %\ + producer.num_acked) + # Since the configured check interval is 5 seconds, we wait another + # 6 seconds to ensure that at least one more cleaning so that the last + # segment is deleted. An altenate to using timeouts is to poll each + # partition untill the log start offset matches the end offset. The + # latter is more robust. + time.sleep(6) + + @cluster(num_nodes=8) + @matrix(bounce_brokers=[True, False], + reassign_from_offset_zero=[True, False]) + def test_reassign_partitions(self, bounce_brokers, reassign_from_offset_zero): """Reassign partitions tests. - Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 + Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, replication-factor=3, + and min.insync.replicas=3 - Produce messages in the background - Consume messages in the background @@ -101,13 +142,19 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest): - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming - Validate that every acked message was consumed - """ - - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = security_protocol - new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) + """ self.kafka.start() - + if not reassign_from_offset_zero: + self.move_start_offset() + + self.producer = VerifiableProducer(self.test_context, self.num_producers, + self.kafka, self.topic, + throughput=self.producer_throughput, + enable_idempotence=True) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, + self.kafka, self.topic, + consumer_timeout_ms=60000, + message_validator=is_int) + + self.enable_idempotence=True self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers)) http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 1924755..8de115f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -80,13 +80,22 @@ public class VerifiableProducer { // if null, then values are produced without a prefix private final Integer valuePrefix; - public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, Integer valuePrefix) { + // The create time to set in messages, in milliseconds since epoch + private Long createTime; + + private final Long startTime; + + public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, + Integer valuePrefix, Long createTime) { this.topic = topic; this.throughput = throughput; this.maxMessages = maxMessages; this.producer = producer; this.valuePrefix = valuePrefix; + this.createTime = createTime; + this.startTime = System.currentTimeMillis(); + } /** Get the command-line argument parser. */ @@ -144,6 +153,15 @@ public class VerifiableProducer { .metavar("CONFIG_FILE") .help("Producer config properties file."); + parser.addArgument("--message-create-time") + .action(store()) + .required(false) + .setDefault(-1) + .type(Integer.class) + .metavar("CREATETIME") + .dest("createTime") + .help("Send messages with creation time starting at the arguments value, in milliseconds since epoch"); + parser.addArgument("--value-prefix") .action(store()) .required(false) @@ -181,6 +199,10 @@ public class VerifiableProducer { int throughput = res.getInt("throughput"); String configFile = res.getString("producer.config"); Integer valuePrefix = res.getInt("valuePrefix"); + Long createTime = (long) res.getInt("createTime"); + + if (createTime == -1L) + createTime = null; Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); @@ -202,12 +224,14 @@ public class VerifiableProducer { StringSerializer serializer = new StringSerializer(); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer); - return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix); + return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime); } /** Produce a message with given key and value. */ public void send(String key, String value) { - ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); + ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, createTime, key, value); + if (createTime != null) + createTime += System.currentTimeMillis() - startTime; numSent++; try { producer.send(record, new PrintInfoCallback(key, value));