Repository: kafka
Updated Branches:
  refs/heads/trunk 1ce6aa550 -> bdf4cba04


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 768c073..a7af24e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,8 +28,7 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP
-import org.apache.kafka.common.record.{RecordBatch, _}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -69,7 +68,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singletonRecords("test".getBytes)
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, 
Long.MaxValue.toString)
 
     // create a log
@@ -77,6 +76,7 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
+                      maxPidExpirationMs = 24 * 60,
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, 
log.numberOfSegments)
@@ -120,6 +120,219 @@ class LogTest extends JUnitSuite {
     assertEquals("Appending an empty message set should not roll log even if 
sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testNonSequentialAppend(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, 
"key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new 
SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, 
epoch = epoch, sequence = 2)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
+  @Test
+  def testDuplicateAppends(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    var seq = 0
+    // Pad the beginning of the log.
+    for (i <- 0 to 5) {
+      val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, 
"key".getBytes, "value".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq)
+      log.append(record, assignOffsets = true)
+      seq = seq + 1
+    }
+    // Append an entry with multiple log records.
+    var record = TestUtils.records(List(
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes)
+    ), pid = pid, epoch = epoch, sequence = seq)
+    val multiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("should have appended 3 entries", 
multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
+    seq = seq + 3
+
+    // Append a Duplicate of the tail, when the entry at the tail has multiple 
records.
+    val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Somehow appended a duplicate entry with multiple log records 
to the tail",
+      multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
+    assertEquals("Somehow appended a duplicate entry with multiple log records 
to the tail",
+      multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
+
+    // Append a partial duplicate of the tail. This is not allowed.
+    try {
+      record = TestUtils.records(
+        List(
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq - 2)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we 
attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a Duplicate of an entry in the middle of the log. This is not 
allowed.
+     try {
+      record = TestUtils.records(
+        List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, 
s"value-1".getBytes)),
+        pid = pid, epoch = epoch, sequence = 1)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we 
attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a duplicate entry with a single record at the tail of the log. 
This should return the appendInfo of the original entry.
+    record = TestUtils.records(List(new SimpleRecord(time.milliseconds, 
"key".getBytes, "value".getBytes)),
+      pid = pid, epoch = epoch, sequence = seq)
+    val origAppendInfo = log.append(record, assignOffsets = true)
+    val newAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Inserted a duplicate record into the log", 
origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate record into the log", 
origAppendInfo.lastOffset, newAppendInfo.lastOffset)
+  }
+
+  @Test
+  def testMulitplePidsPerMemoryRecord() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 
1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 
2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 
3L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 
4L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    log.append(memoryRecords, assignOffsets = false)
+    log.flush()
+
+    val fetchedData = log.read(0, Int.MaxValue)
+
+    val origIterator = memoryRecords.batches.iterator()
+    for (batch <- fetchedData.records.batches.asScala) {
+      assertTrue(origIterator.hasNext)
+      val origEntry = origIterator.next()
+      assertEquals(origEntry.producerId, batch.producerId)
+      assertEquals(origEntry.baseOffset, batch.baseOffset)
+      assertEquals(origEntry.baseSequence, batch.baseSequence)
+    }
+  }
+
+  @Test(expected = classOf[DuplicateSequenceNumberException])
+  def testMultiplePidsWithDuplicates() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 
1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 
2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 
1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 
2L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), 
1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+
+    log.append(MemoryRecords.readableRecords(buffer), assignOffsets = false)
+    // Should throw a duplicate seqeuence exception here.
+    fail("should have thrown a DuplicateSequenceNumberException.")
+  }
+
+  @Test(expected = classOf[ProducerFencedException])
+  def testOldProducerEpoch(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val newEpoch: Short = 1
+    val oldEpoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, 
"key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new 
SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, 
epoch = oldEpoch, sequence = 0)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
   /**
    * Test for jitter s for time based log roll. This test appends messages 
then changes the time
    * using the mock clock to force the log to roll and checks the number of 
segments.
@@ -167,7 +380,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size 
sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -183,7 +396,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 
0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 
0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = 
time.milliseconds))
   }
 
@@ -196,7 +409,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size 
sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -220,7 +433,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -245,7 +458,7 @@ class LogTest extends JUnitSuite {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // keep appending until we have two segments with only a single message in 
the second segment
     while(log.numberOfSegments == 1)
@@ -262,7 +475,7 @@ class LogTest extends JUnitSuite {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -290,7 +503,7 @@ class LogTest extends JUnitSuite {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -326,7 +539,7 @@ class LogTest extends JUnitSuite {
 
     // set up replica log starting with offset 1024 and with one message (at 
offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 
0, log.read(1025, 1000).records.sizeInBytes)
@@ -357,7 +570,7 @@ class LogTest extends JUnitSuite {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => 
TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                
 timestamp = time.milliseconds))
@@ -395,7 +608,7 @@ class LogTest extends JUnitSuite {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 
0, 1, 2, 3 */
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, new 
SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -421,7 +634,7 @@ class LogTest extends JUnitSuite {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singletonRecords(value = i.toString.getBytes, 
timestamp = time.milliseconds - 10))
 
@@ -457,7 +670,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: 
java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size 
sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSet)
@@ -484,7 +697,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -526,7 +739,7 @@ class LogTest extends JUnitSuite {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: 
java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, 
recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -552,7 +765,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: 
java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = 
TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
@@ -578,12 +791,12 @@ class LogTest extends JUnitSuite {
       assertEquals("Should have same number of time index entries as before.", 
numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 
lastOffset, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 
lastOffset, scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -599,7 +812,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, new 
SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -623,7 +836,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), 
timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -635,7 +848,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is 
reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", 
log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", 
log.logSegments.head.timeIndex.entries > 0)
@@ -662,7 +875,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), 
timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -672,7 +885,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 
numMessages + 1, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 
numMessages + 1, scheduler = time.scheduler, time = time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, 
segArray(i).timeIndex.entries)
@@ -693,7 +906,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), 
timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -715,7 +928,7 @@ class LogTest extends JUnitSuite {
     }
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, 
time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, 
scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is 
reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, 
None).records.batches.iterator.next().lastOffset)
@@ -842,8 +1055,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     assertTrue("The first index file should have been replaced with a larger 
file", bogusIndex1.length > 0)
     assertTrue("The first time index file should have been replaced with a 
larger file", bogusTimeIndex1.length > 0)
@@ -874,8 +1087,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // add enough messages to roll over several segments then close and 
re-open and attempt to truncate
     for (_ <- 0 until 100)
@@ -885,8 +1098,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, 
log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -911,8 +1124,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -952,8 +1165,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -967,8 +1180,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     assertEquals("The deleted segments should be gone.", 1, 
log.numberOfSegments)
   }
 
@@ -978,8 +1191,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
     log.append(TestUtils.singletonRecords(value = null))
     val head = log.read(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
@@ -992,8 +1205,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val records = (0 until 2).map(id => new 
SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => 
log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
     val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(1.toString.getBytes))
@@ -1006,8 +1219,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log.append(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, 
"value".getBytes)))
   }
@@ -1029,8 +1242,8 @@ class LogTest extends JUnitSuite {
                         config,
                         logStartOffset = 0L,
                         recoveryPoint = 0L,
-                        time.scheduler,
-                        time)
+                        scheduler = time.scheduler,
+                        time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
@@ -1072,8 +1285,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new 
SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, 
CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
     val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, 
CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -1121,8 +1334,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     for (_ <- 0 until 100)
       log.append(set)
     log.close()
@@ -1221,8 +1434,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1366,8 +1579,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
new file mode 100644
index 0000000..d27f0ca
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
@@ -0,0 +1,224 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, 
OutOfOrderSequenceException, ProducerFencedException}
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class ProducerIdMappingTest extends JUnitSuite {
+  var idMappingDir: File = null
+  var config: LogConfig = null
+  var idMapping: ProducerIdMapping = null
+  val partition = new TopicPartition("test", 0)
+  val pid = 1L
+  val maxPidExpirationMs = 60 * 1000
+  val time = new MockTime
+
+  @Before
+  def setUp(): Unit = {
+    // Create configuration including number of snapshots to hold
+    val props = new Properties()
+    config = LogConfig(props)
+
+    // Create temporary directory
+    idMappingDir = TestUtils.tempDir()
+
+    // Instantiate IdMapping
+    idMapping = new ProducerIdMapping(config, partition, idMappingDir, 
maxPidExpirationMs)
+  }
+
+  @After
+  def tearDown(): Unit = {
+    idMappingDir.listFiles().foreach(f => f.delete())
+    idMappingDir.deleteOnExit()
+  }
+
+  @Test
+  def testBasicIdMapping(): Unit = {
+    val epoch = 0.toShort
+
+    // First entry for id 0 added
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    // Second entry for id 0 added
+    checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+
+    // Duplicate sequence number (matches previous sequence number)
+    assertThrows[DuplicateSequenceNumberException] {
+      checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+    }
+
+    // Invalid sequence number (greater than next expected sequence number)
+    assertThrows[OutOfOrderSequenceException] {
+      checkAndUpdate(idMapping, pid, 5, epoch, 0L, 2L)
+    }
+
+    // Change epoch
+    checkAndUpdate(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L)
+
+    // Incorrect epoch
+    assertThrows[ProducerFencedException] {
+      checkAndUpdate(idMapping, pid, 0, epoch, 0L, 4L)
+    }
+  }
+
+  @Test
+  def testTakeSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1L)
+
+    // Take snapshot
+    idMapping.maybeTakeSnapshot()
+
+    // Check that file exists and it is not empty
+    assertEquals("Directory doesn't contain a single file as expected", 1, 
idMappingDir.list().length)
+    assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0)
+  }
+
+  @Test
+  def testRecoverFromSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, time.milliseconds)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, time.milliseconds)
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, 
idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(3L, time.milliseconds)
+
+    // entry added after recovery
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, time.milliseconds)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testRemoveExpiredPidsOnReload(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1)
+
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, 
idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(1L, 70000)
+
+    // entry added after recovery. The pid should be expired now, and would 
not exist in the pid mapping. Hence
+    // we should get an out of order sequence exception.
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, 70001)
+  }
+
+
+  @Test
+  def testRemoveOldSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 2L)
+
+    idMapping.maybeTakeSnapshot()
+
+    checkAndUpdate(idMapping, pid, 2, epoch, 2L, 3L)
+
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: 
${idMappingDir.listFiles().length}",
+               1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testSkipSnapshotIfOffsetUnchanged(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    idMapping.maybeTakeSnapshot()
+
+    // nothing changed so there should be no new snapshot
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: 
${idMappingDir.listFiles().length}",
+      1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testStartOffset(): Unit = {
+    val epoch = 0.toShort
+    val pid2 = 2L
+    checkAndUpdate(idMapping, pid2, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 0, epoch, 1L, 2L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 2L, 3L)
+    checkAndUpdate(idMapping, pid, 2, epoch, 3L, 4L)
+    idMapping.maybeTakeSnapshot()
+
+    intercept[OutOfOrderSequenceException] {
+      val recoveredMapping = new ProducerIdMapping(config, partition, 
idMappingDir, maxPidExpirationMs)
+      recoveredMapping.truncateAndReload(1L, time.milliseconds)
+      checkAndUpdate(recoveredMapping, pid2, 1, epoch, 4L, 5L)
+    }
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testPidExpirationTimeout() {
+    val epoch = 5.toShort
+    val sequence = 37
+    checkAndUpdate(idMapping, pid, sequence, epoch, 1L)
+    time.sleep(maxPidExpirationMs + 1)
+    idMapping.checkForExpiredPids(time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 1L)
+  }
+
+  @Test
+  def testLoadPid() {
+    val epoch = 5.toShort
+    val sequence = 37
+    val createTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), 
time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testLoadIgnoresExpiredPids() {
+    val epoch = 5.toShort
+    val sequence = 37
+
+    val createTimeMs = time.milliseconds
+    time.sleep(maxPidExpirationMs + 1)
+    val loadTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), 
loadTimeMs)
+
+    // entry wasn't loaded, so this should fail
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  private def checkAndUpdate(mapping: ProducerIdMapping,
+                             pid: Long,
+                             seq: Int,
+                             epoch: Short,
+                             lastOffset: Long,
+                             timestamp: Long = time.milliseconds()): Unit = {
+    val numRecords = 1
+    val incomingPidEntry = ProducerIdEntry(epoch, seq, lastOffset, numRecords, 
timestamp)
+    val producerAppendInfo = new ProducerAppendInfo(pid, 
mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty))
+    producerAppendInfo.append(incomingPidEntry)
+    mapping.update(producerAppendInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index b6e3607..f868032 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -201,6 +201,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val start = System.currentTimeMillis()
 
     //Start the new broker (and hence start replicating)
+    debug("Starting new broker")
     brokers = brokers :+ createServer(fromProps(createBrokerConfig(101, 
zkConnect)))
     waitForOffsetsToMatch(msgCount, 0, 101)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9ae7195..8766855 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -937,9 +937,10 @@ object TestUtils extends Logging {
                    flushRecoveryOffsetCheckpointMs = 10000L,
                    flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
+                   maxPidExpirationMs = 60 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
-                   brokerState = new BrokerState())
+                   brokerState = BrokerState())
   }
 
   @deprecated("This method has been deprecated and it will be removed in a 
future release.", "0.10.0.0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 918c4b5..db62020 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -40,6 +40,21 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     assertTrue("Deletion should be successful", 
zkUtils.conditionalDeletePath(path, 0))
   }
 
+  // Verify behaviour of ZkUtils.createSequentialPersistentPath since 
PIDManager relies on it
+  @Test
+  def testPersistentSequentialPath() {
+    // Given an existing path
+    zkUtils.createPersistentPath(path)
+
+    var result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000000", result)
+
+    result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000001", result)
+  }
+
   @Test
   def testAbortedConditionalDeletePath() {
     // Given an existing path that gets updated

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index 859e3c4..3cf3abd 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -15,25 +15,22 @@
 
 import json
 import os
-import signal
 import time
 
 from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
-from kafkatest.utils.remote_account import line_count
 
 class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, 
BackgroundThreadService):
     """This service wraps org.apache.kafka.tools.VerifiableProducer for use in
-    system testing. 
+    system testing.
 
     NOTE: this class should be treated as a PUBLIC API. Downstream users use
-    this service both directly and through class extension, so care must be 
+    this service both directly and through class extension, so care must be
     taken to ensure compatibility.
     """
 
@@ -59,7 +56,7 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, 
throughput=100000,
                  message_validator=is_int, compression_types=None, 
version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150, request_timeout_sec=30, 
log_level="INFO"):
+                 stop_timeout_sec=150, request_timeout_sec=30, 
log_level="INFO", enable_idempotence=False):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages 
produced. There are
@@ -92,6 +89,7 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         self.acks = acks
         self.stop_timeout_sec = stop_timeout_sec
         self.request_timeout_sec = request_timeout_sec
+        self.enable_idempotence = enable_idempotence
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -124,6 +122,12 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
             producer_prop_file += "\nacks=%s\n" % self.acks
 
         producer_prop_file += "\nrequest.timeout.ms=%d\n" % 
(self.request_timeout_sec * 1000)
+        if self.enable_idempotence:
+            self.logger.info("Setting up an idempotent producer")
+            producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n"
+            producer_prop_file += "\nretries=50\n"
+            producer_prop_file += "\nenable.idempotence=true\n"
+
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, 
producer_prop_file)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py 
b/tests/kafkatest/tests/core/replication_test.py
index 3e17d56..5d96d7b 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -104,11 +104,12 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, 
topics={self.topic: {
-                                                                    
"partitions": 3,
-                                                                    
"replication-factor": 3,
-                                                                    'configs': 
{"min.insync.replicas": 2}}
-                                                                })
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
+                                  topics={self.topic: {
+                                      "partitions": 3,
+                                      "replication-factor": 3,
+                                      'configs': {"min.insync.replicas": 2}}
+                                  })
         self.producer_throughput = 1000
         self.num_producers = 1
         self.num_consumers = 1
@@ -123,6 +124,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["leader"],
+            security_protocol=["PLAINTEXT"],
+            enable_idempotence=[True])
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
+            broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["controller"],
@@ -133,7 +138,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @parametrize(failure_mode="hard_bounce",
             broker_type="leader",
             security_protocol="SASL_SSL", 
client_sasl_mechanism="SCRAM-SHA-256", 
interbroker_sasl_mechanism="SCRAM-SHA-512")
-    def test_replication_with_broker_failure(self, failure_mode, 
security_protocol, broker_type, client_sasl_mechanism="GSSAPI", 
interbroker_sasl_mechanism="GSSAPI"):
+    def test_replication_with_broker_failure(self, failure_mode, 
security_protocol,
+                                             broker_type, 
client_sasl_mechanism="GSSAPI",
+                                             
interbroker_sasl_mechanism="GSSAPI",
+                                             enable_idempotence=False):
         """Replication tests.
         These tests verify that replication provides simple durability 
guarantees by checking that data acked by
         brokers is still available for consumption in the face of various 
failure scenarios.
@@ -152,8 +160,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.kafka.client_sasl_mechanism = client_sasl_mechanism
         self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" 
else True
-        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.enable_idempotence = enable_idempotence
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka, self.topic, 
throughput=self.producer_throughput, enable_idempotence=enable_idempotence)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, 
message_validator=is_int)
         self.kafka.start()
-        
         self.run_produce_consume_validate(core_test_action=lambda: 
failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py 
b/tests/kafkatest/tests/produce_consume_validate.py
index cad9150..079305c 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -42,6 +42,7 @@ class ProduceConsumeValidateTest(Test):
         # producer begins producing messages, in which case we will miss the
         # initial set of messages and get spurious test failures.
         self.consumer_init_timeout_sec = 0
+        self.enable_idempotence = False
 
     def setup_producer_and_consumer(self):
         raise NotImplementedError("Subclasses should implement this")
@@ -67,7 +68,7 @@ class ProduceConsumeValidateTest(Test):
             remaining_time = self.consumer_init_timeout_sec - (end - start)
             if remaining_time < 0 :
                 remaining_time = 0
-            if self.consumer.new_consumer is True:
+            if self.consumer.new_consumer:
                 wait_until(lambda: 
self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
                            timeout_sec=remaining_time,
                            err_msg="Consumer process took more than %d s to 
have partitions assigned" %\
@@ -167,9 +168,17 @@ class ProduceConsumeValidateTest(Test):
             msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
 
 
+        if self.enable_idempotence:
+            self.logger.info("Ran a test with idempotence enabled. We expect 
no duplicates")
+        else:
+            self.logger.info("Ran a test with idempotence disabled.")
+
         # Are there duplicates?
         if len(set(consumed)) != len(consumed):
-            msg += "(There are also %s duplicate messages in the log - but 
that is an acceptable outcome)\n" % abs(len(set(consumed)) - len(consumed))
+            num_duplicates = abs(len(set(consumed)) - len(consumed))
+            msg += "(There are also %s duplicate messages in the log - but 
that is an acceptable outcome)\n" % num_duplicates
+            if self.enable_idempotence:
+                assert False, "Detected %s duplicates even though idempotence 
was enabled." % num_duplicates
 
         # Collect all logs if validation fails
         if not success:

Reply via email to