Repository: kafka
Updated Branches:
  refs/heads/trunk 64930cd71 -> 90b5ce3f0


KAFKA-6016; Make the reassign partitions system test use the idempotent producer

With these changes, we are ensuring that the partitions being reassigned are 
from non-zero offsets. We also ensure that every message in the log has 
producerId and sequence number.

This means that it successfully reproduces 
https://issues.apache.org/jira/browse/KAFKA-6003.

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4029 from 
apurvam/KAFKA-6016-add-idempotent-producer-to-reassign-partitions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/90b5ce3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/90b5ce3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/90b5ce3f

Branch: refs/heads/trunk
Commit: 90b5ce3f04626ee24aebd1d06588489180a4bb05
Parents: 64930cd
Author: Apurva Mehta <apu...@confluent.io>
Authored: Tue Oct 10 10:31:52 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 10 10:32:17 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerRecord.java  |  8 +-
 .../kafkatest/services/kafka/config_property.py |  1 +
 .../services/kafka/templates/kafka.properties   |  1 -
 tests/kafkatest/services/verifiable_producer.py | 10 ++-
 .../tests/core/reassign_partitions_test.py      | 91 +++++++++++++++-----
 .../apache/kafka/tools/VerifiableProducer.java  | 30 ++++++-
 6 files changed, 109 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 85428e5..c8ff00b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -58,7 +58,8 @@ public class ProducerRecord<K, V> {
      * 
      * @param topic The topic the record will be appended to
      * @param partition The partition to which the record should be sent
-     * @param timestamp The timestamp of the record
+     * @param timestamp The timestamp of the record, in milliseconds since 
epoch. If null, the producer will assign
+     *                  the timestamp using System.currentTimeMillis().
      * @param key The key that will be included in the record
      * @param value The record contents
      * @param headers the headers that will be included in the record
@@ -85,7 +86,8 @@ public class ProducerRecord<K, V> {
      *
      * @param topic The topic the record will be appended to
      * @param partition The partition to which the record should be sent
-     * @param timestamp The timestamp of the record
+     * @param timestamp The timestamp of the record, in milliseconds since 
epoch. If null, the producer will assign the
+     *                  timestamp using System.currentTimeMillis().
      * @param key The key that will be included in the record
      * @param value The record contents
      */
@@ -168,7 +170,7 @@ public class ProducerRecord<K, V> {
     }
 
     /**
-     * @return The timestamp
+     * @return The timestamp, which is in milliseconds since epoch.
      */
     public Long timestamp() {
         return timestamp;

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 8b64d0e..b261960 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -33,6 +33,7 @@ NUM_RECOVERY_THREADS_PER_DATA_DIR = 
"num.recovery.threads.per.data.dir"
 LOG_RETENTION_HOURS = "log.retention.hours"
 LOG_SEGMENT_BYTES = "log.segment.bytes"
 LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms"
+LOG_RETENTION_MS = "log.retention.ms"
 LOG_CLEANER_ENABLE = "log.cleaner.enable"
 
 AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index eca5bf6..8cca14f 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -30,7 +30,6 @@ num.partitions=1
 num.recovery.threads.per.data.dir=1
 log.retention.hours=168
 log.segment.bytes=1073741824
-log.retention.check.interval.ms=300000
 log.cleaner.enable=false
 
 security.inter.broker.protocol={{ 
security_config.interbroker_security_protocol }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index 6ba3e86..17f1ec3 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -56,7 +56,8 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, 
throughput=100000,
                  message_validator=is_int, compression_types=None, 
version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150, request_timeout_sec=30, 
log_level="INFO", enable_idempotence=False, offline_nodes=[]):
+                 stop_timeout_sec=150, request_timeout_sec=30, 
log_level="INFO",
+                 enable_idempotence=False, offline_nodes=[], create_time=-1):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages 
produced. There are
@@ -91,6 +92,7 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         self.request_timeout_sec = request_timeout_sec
         self.enable_idempotence = enable_idempotence
         self.offline_nodes = offline_nodes
+        self.create_time = create_time
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -125,8 +127,8 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         producer_prop_file += "\nrequest.timeout.ms=%d\n" % 
(self.request_timeout_sec * 1000)
         if self.enable_idempotence:
             self.logger.info("Setting up an idempotent producer")
-            producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n"
-            producer_prop_file += "\nretries=50\n"
+            producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n"
+            producer_prop_file += "\nretries=1000000\n"
             producer_prop_file += "\nenable.idempotence=true\n"
 
         self.logger.info("verifiable_producer.properties:")
@@ -194,6 +196,8 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
             cmd += " --value-prefix %s" % str(idx)
         if self.acks is not None:
             cmd += " --acks %s " % str(self.acks)
+        if self.create_time > -1:
+            cmd += " --message-create-time %s " % str(self.create_time)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, 
VerifiableProducer.STDOUT_CAPTURE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tests/kafkatest/tests/core/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py 
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index fef57d1..5abde72 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
+from kafkatest.services.kafka import config_property
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -24,7 +25,7 @@ from kafkatest.services.console_consumer import 
ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 import random
-
+import time
 
 class ReassignPartitionsTest(ProduceConsumeValidateTest):
     """
@@ -38,13 +39,25 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
         super(ReassignPartitionsTest, self).__init__(test_context=test_context)
 
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, 
topics={self.topic: {
-                                                                    
"partitions": 20,
-                                                                    
"replication-factor": 3,
-                                                                    'configs': 
{"min.insync.replicas": 2}}
-                                                                })
         self.num_partitions = 20
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        # We set the min.insync.replicas to match the replication factor 
because
+        # it makes the test more stringent. If min.isr = 2 and
+        # replication.factor=3, then the test would tolerate the failure of
+        # reassignment for upto one replica per partition, which is not
+        # desirable for this test in particular.
+        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk,
+                                  server_prop_overides=[
+                                      [config_property.LOG_ROLL_TIME_MS, 
"5000"],
+                                      
[config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"]
+                                  ],
+                                  topics={self.topic: {
+                                      "partitions": self.num_partitions,
+                                      "replication-factor": 3,
+                                      'configs': {
+                                          "min.insync.replicas": 3,
+                                      }}
+                                  })
         self.timeout_sec = 60
         self.producer_throughput = 1000
         self.num_producers = 1
@@ -86,14 +99,42 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
             self.clean_bounce_some_brokers()
 
         # Wait until finished or timeout
-        wait_until(lambda: 
self.kafka.verify_reassign_partitions(partition_info), 
timeout_sec=self.timeout_sec, backoff_sec=.5)
+        wait_until(lambda: 
self.kafka.verify_reassign_partitions(partition_info),
+                   timeout_sec=self.timeout_sec, backoff_sec=.5)
 
-    @cluster(num_nodes=7)
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
-    def test_reassign_partitions(self, bounce_brokers, security_protocol):
+    def move_start_offset(self):
+        """We move the start offset of the topic by writing really old messages
+        and waiting for them to be cleaned up.
+        """
+        producer = VerifiableProducer(self.test_context, 1, self.kafka, 
self.topic,
+                                      throughput=-1, enable_idempotence=True,
+                                      create_time=1000)
+        producer.start()
+        wait_until(lambda: producer.num_acked > 0,
+                   timeout_sec=30,
+                   err_msg="Failed to get an acknowledgement for %ds" % 30)
+        # Wait 8 seconds to let the topic be seeded with messages that will
+        # be deleted. The 8 seconds is important, since we should get 2 deleted
+        # segments in this period based on the configured log roll time and the
+        # retention check interval.
+        time.sleep(8)
+        producer.stop()
+        self.logger.info("Seeded topic with %d messages which will be deleted" 
%\
+                         producer.num_acked)
+        # Since the configured check interval is 5 seconds, we wait another
+        # 6 seconds to ensure that at least one more cleaning so that the last
+        # segment is deleted. An altenate to using timeouts is to poll each
+        # partition untill the log start offset matches the end offset. The
+        # latter is more robust.
+        time.sleep(6)
+
+    @cluster(num_nodes=8)
+    @matrix(bounce_brokers=[True, False],
+            reassign_from_offset_zero=[True, False])
+    def test_reassign_partitions(self, bounce_brokers, 
reassign_from_offset_zero):
         """Reassign partitions tests.
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, 
replication-factor=3, and min.insync.replicas=2
+        Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, 
replication-factor=3,
+        and min.insync.replicas=3
 
             - Produce messages in the background
             - Consume messages in the background
@@ -101,13 +142,19 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
             - If bounce_brokers is True, also bounce a few brokers while 
partition re-assignment is in progress
             - When done reassigning partitions and bouncing brokers, stop 
producing, and finish consuming
             - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" 
else True
-        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, 
message_validator=is_int)
+            """
         self.kafka.start()
-
+        if not reassign_from_offset_zero:
+            self.move_start_offset()
+
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers,
+                                           self.kafka, self.topic,
+                                           throughput=self.producer_throughput,
+                                           enable_idempotence=True)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
+                                        self.kafka, self.topic,
+                                        consumer_timeout_ms=60000,
+                                        message_validator=is_int)
+
+        self.enable_idempotence=True
         self.run_produce_consume_validate(core_test_action=lambda: 
self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b5ce3f/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 1924755..8de115f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -80,13 +80,22 @@ public class VerifiableProducer {
     // if null, then values are produced without a prefix
     private final Integer valuePrefix;
 
-    public VerifiableProducer(KafkaProducer<String, String> producer, String 
topic, int throughput, int maxMessages, Integer valuePrefix) {
+    // The create time to set in messages, in milliseconds since epoch
+    private Long createTime;
+
+    private final Long startTime;
+
+    public VerifiableProducer(KafkaProducer<String, String> producer, String 
topic, int throughput, int maxMessages,
+                              Integer valuePrefix, Long createTime) {
 
         this.topic = topic;
         this.throughput = throughput;
         this.maxMessages = maxMessages;
         this.producer = producer;
         this.valuePrefix = valuePrefix;
+        this.createTime = createTime;
+        this.startTime = System.currentTimeMillis();
+
     }
 
     /** Get the command-line argument parser. */
@@ -144,6 +153,15 @@ public class VerifiableProducer {
                 .metavar("CONFIG_FILE")
                 .help("Producer config properties file.");
 
+        parser.addArgument("--message-create-time")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("CREATETIME")
+                .dest("createTime")
+                .help("Send messages with creation time starting at the 
arguments value, in milliseconds since epoch");
+
         parser.addArgument("--value-prefix")
             .action(store())
             .required(false)
@@ -181,6 +199,10 @@ public class VerifiableProducer {
         int throughput = res.getInt("throughput");
         String configFile = res.getString("producer.config");
         Integer valuePrefix = res.getInt("valuePrefix");
+        Long createTime = (long) res.getInt("createTime");
+
+        if (createTime == -1L)
+            createTime = null;
 
         Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("brokerList"));
@@ -202,12 +224,14 @@ public class VerifiableProducer {
         StringSerializer serializer = new StringSerializer();
         KafkaProducer<String, String> producer = new 
KafkaProducer<>(producerProps, serializer, serializer);
 
-        return new VerifiableProducer(producer, topic, throughput, 
maxMessages, valuePrefix);
+        return new VerifiableProducer(producer, topic, throughput, 
maxMessages, valuePrefix, createTime);
     }
 
     /** Produce a message with given key and value. */
     public void send(String key, String value) {
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
key, value);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
null, createTime, key, value);
+        if (createTime != null)
+            createTime += System.currentTimeMillis() - startTime;
         numSent++;
         try {
             producer.send(record, new PrintInfoCallback(key, value));

Reply via email to