Repository: kafka Updated Branches: refs/heads/trunk 1ce6aa550 -> bdf4cba04
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 768c073..a7af24e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -28,8 +28,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.utils._ import kafka.server.KafkaConfig -import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP -import org.apache.kafka.common.record.{RecordBatch, _} +import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ @@ -69,7 +68,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() - logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long) + logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long) logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) // create a log @@ -77,6 +76,7 @@ class LogTest extends JUnitSuite { LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, + maxPidExpirationMs = 24 * 60, scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) @@ -120,6 +120,219 @@ class LogTest extends JUnitSuite { assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments) } + @Test(expected = classOf[OutOfOrderSequenceException]) + def testNonSequentialAppend(): Unit = { + val logProps = new Properties() + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + val pid = 1L + val epoch: Short = 0 + + val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0) + log.append(records, assignOffsets = true) + + val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 2) + log.append(nextRecords, assignOffsets = true) + } + + @Test + def testDuplicateAppends(): Unit = { + val logProps = new Properties() + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + val pid = 1L + val epoch: Short = 0 + + var seq = 0 + // Pad the beginning of the log. + for (i <- 0 to 5) { + val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), + pid = pid, epoch = epoch, sequence = seq) + log.append(record, assignOffsets = true) + seq = seq + 1 + } + // Append an entry with multiple log records. + var record = TestUtils.records(List( + new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) + ), pid = pid, epoch = epoch, sequence = seq) + val multiEntryAppendInfo = log.append(record, assignOffsets = true) + assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3) + seq = seq + 3 + + // Append a Duplicate of the tail, when the entry at the tail has multiple records. + val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true) + assertEquals("Somehow appended a duplicate entry with multiple log records to the tail", + multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset) + assertEquals("Somehow appended a duplicate entry with multiple log records to the tail", + multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset) + + // Append a partial duplicate of the tail. This is not allowed. + try { + record = TestUtils.records( + List( + new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)), + pid = pid, epoch = epoch, sequence = seq - 2) + log.append(record, assignOffsets = true) + fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " + + "in the middle of the log.") + } catch { + case e: OutOfOrderSequenceException => // Good! + } + + // Append a Duplicate of an entry in the middle of the log. This is not allowed. + try { + record = TestUtils.records( + List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)), + pid = pid, epoch = epoch, sequence = 1) + log.append(record, assignOffsets = true) + fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " + + "in the middle of the log.") + } catch { + case e: OutOfOrderSequenceException => // Good! + } + + // Append a duplicate entry with a single record at the tail of the log. This should return the appendInfo of the original entry. + record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), + pid = pid, epoch = epoch, sequence = seq) + val origAppendInfo = log.append(record, assignOffsets = true) + val newAppendInfo = log.append(record, assignOffsets = true) + assertEquals("Inserted a duplicate record into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset) + assertEquals("Inserted a duplicate record into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset) + } + + @Test + def testMulitplePidsPerMemoryRecord() : Unit = { + val logProps = new Properties() + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + val epoch: Short = 0 + + val buffer = ByteBuffer.allocate(512) + + var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + // Append a record with other pids. + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + // Append a record with other pids. + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + // Append a record with other pids. + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + buffer.flip() + val memoryRecords = MemoryRecords.readableRecords(buffer) + + log.append(memoryRecords, assignOffsets = false) + log.flush() + + val fetchedData = log.read(0, Int.MaxValue) + + val origIterator = memoryRecords.batches.iterator() + for (batch <- fetchedData.records.batches.asScala) { + assertTrue(origIterator.hasNext) + val origEntry = origIterator.next() + assertEquals(origEntry.producerId, batch.producerId) + assertEquals(origEntry.baseOffset, batch.baseOffset) + assertEquals(origEntry.baseSequence, batch.baseSequence) + } + } + + @Test(expected = classOf[DuplicateSequenceNumberException]) + def testMultiplePidsWithDuplicates() : Unit = { + val logProps = new Properties() + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + val epoch: Short = 0 + + val buffer = ByteBuffer.allocate(512) + + var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + // Append a record with other pids. + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + // Append a record with other pids. + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 1L, epoch, 1) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 2L, epoch, 1) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), 1L, epoch, 1) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + + buffer.flip() + + log.append(MemoryRecords.readableRecords(buffer), assignOffsets = false) + // Should throw a duplicate seqeuence exception here. + fail("should have thrown a DuplicateSequenceNumberException.") + } + + @Test(expected = classOf[ProducerFencedException]) + def testOldProducerEpoch(): Unit = { + val logProps = new Properties() + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + val pid = 1L + val newEpoch: Short = 1 + val oldEpoch: Short = 0 + + val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0) + log.append(records, assignOffsets = true) + + val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = oldEpoch, sequence = 0) + log.append(nextRecords, assignOffsets = true) + } + /** * Test for jitter s for time based log roll. This test appends messages then changes the time * using the mock clock to force the log to roll and checks the number of segments. @@ -167,7 +380,7 @@ class LogTest extends JUnitSuite { // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) // create a log - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -183,7 +396,7 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)) } @@ -196,7 +409,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray for(value <- values) @@ -220,7 +433,7 @@ class LogTest extends JUnitSuite { def testAppendAndReadWithNonSequentialOffsets() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -245,7 +458,7 @@ class LogTest extends JUnitSuite { def testReadAtLogGap() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -262,7 +475,7 @@ class LogTest extends JUnitSuite { def testReadWithMinMessage() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -290,7 +503,7 @@ class LogTest extends JUnitSuite { def testReadWithTooSmallMaxLength() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -326,7 +539,7 @@ class LogTest extends JUnitSuite { // set up replica log starting with offset 1024 and with one message (at offset 1024) logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) log.append(TestUtils.singletonRecords(value = "42".getBytes)) assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes) @@ -357,7 +570,7 @@ class LogTest extends JUnitSuite { /* create a multipart log with 100 messages */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds)) @@ -395,7 +608,7 @@ class LogTest extends JUnitSuite { /* this log should roll after every messageset */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes))) @@ -421,7 +634,7 @@ class LogTest extends JUnitSuite { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10)) @@ -457,7 +670,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) try { log.append(messageSet) @@ -484,7 +697,7 @@ class LogTest extends JUnitSuite { val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) try { log.append(messageSetWithUnkeyedMessage) @@ -526,7 +739,7 @@ class LogTest extends JUnitSuite { val maxMessageSize = second.sizeInBytes - 1 val logProps = new Properties() logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) // should be able to append the small message log.append(first) @@ -552,7 +765,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) for(i <- 0 until numMessages) log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), timestamp = time.milliseconds + i * 10)) @@ -578,12 +791,12 @@ class LogTest extends JUnitSuite { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time) verifyRecoveredLog(log) log.close() // test recovery case - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) verifyRecoveredLog(log) log.close() } @@ -599,7 +812,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) val messages = (0 until numMessages).map { i => MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes())) @@ -623,7 +836,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) for(i <- 0 until numMessages) log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) @@ -635,7 +848,7 @@ class LogTest extends JUnitSuite { timeIndexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) @@ -662,7 +875,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0") val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) for(i <- 0 until numMessages) log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) @@ -672,7 +885,7 @@ class LogTest extends JUnitSuite { timeIndexFiles.foreach(_.delete()) // The rebuilt time index should be empty - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time) val segArray = log.logSegments.toArray for (i <- 0 until segArray.size - 1) { assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) @@ -693,7 +906,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) for(i <- 0 until numMessages) log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) @@ -715,7 +928,7 @@ class LogTest extends JUnitSuite { } // reopen the log - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset) @@ -842,8 +1055,8 @@ class LogTest extends JUnitSuite { LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) @@ -874,8 +1087,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) // add enough messages to roll over several segments then close and re-open and attempt to truncate for (_ <- 0 until 100) @@ -885,8 +1098,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) @@ -911,8 +1124,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) // append some messages to create some segments for (_ <- 0 until 100) @@ -952,8 +1165,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) // append some messages to create some segments for (_ <- 0 until 100) @@ -967,8 +1180,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } @@ -978,8 +1191,8 @@ class LogTest extends JUnitSuite { LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) log.append(TestUtils.singletonRecords(value = null)) val head = log.read(0, 4096, None).records.records.iterator.next() assertEquals(0, head.offset) @@ -992,8 +1205,8 @@ class LogTest extends JUnitSuite { LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record))) val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes)) @@ -1006,8 +1219,8 @@ class LogTest extends JUnitSuite { LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) log.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes))) } @@ -1029,8 +1242,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) log.append(set) @@ -1072,8 +1285,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes())) val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes())) @@ -1121,8 +1334,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) for (_ <- 0 until 100) log.append(set) log.close() @@ -1221,8 +1434,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) // append some messages to create some segments for (_ <- 0 until 100) @@ -1366,8 +1579,8 @@ class LogTest extends JUnitSuite { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) log } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala new file mode 100644 index 0000000..d27f0ca --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File +import java.util.Properties + +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException} +import org.apache.kafka.common.utils.MockTime +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite + +class ProducerIdMappingTest extends JUnitSuite { + var idMappingDir: File = null + var config: LogConfig = null + var idMapping: ProducerIdMapping = null + val partition = new TopicPartition("test", 0) + val pid = 1L + val maxPidExpirationMs = 60 * 1000 + val time = new MockTime + + @Before + def setUp(): Unit = { + // Create configuration including number of snapshots to hold + val props = new Properties() + config = LogConfig(props) + + // Create temporary directory + idMappingDir = TestUtils.tempDir() + + // Instantiate IdMapping + idMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs) + } + + @After + def tearDown(): Unit = { + idMappingDir.listFiles().foreach(f => f.delete()) + idMappingDir.deleteOnExit() + } + + @Test + def testBasicIdMapping(): Unit = { + val epoch = 0.toShort + + // First entry for id 0 added + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L) + + // Second entry for id 0 added + checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L) + + // Duplicate sequence number (matches previous sequence number) + assertThrows[DuplicateSequenceNumberException] { + checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L) + } + + // Invalid sequence number (greater than next expected sequence number) + assertThrows[OutOfOrderSequenceException] { + checkAndUpdate(idMapping, pid, 5, epoch, 0L, 2L) + } + + // Change epoch + checkAndUpdate(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L) + + // Incorrect epoch + assertThrows[ProducerFencedException] { + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 4L) + } + } + + @Test + def testTakeSnapshot(): Unit = { + val epoch = 0.toShort + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L) + checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1L) + + // Take snapshot + idMapping.maybeTakeSnapshot() + + // Check that file exists and it is not empty + assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length) + assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0) + } + + @Test + def testRecoverFromSnapshot(): Unit = { + val epoch = 0.toShort + checkAndUpdate(idMapping, pid, 0, epoch, 0L, time.milliseconds) + checkAndUpdate(idMapping, pid, 1, epoch, 1L, time.milliseconds) + idMapping.maybeTakeSnapshot() + val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(3L, time.milliseconds) + + // entry added after recovery + checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, time.milliseconds) + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testRemoveExpiredPidsOnReload(): Unit = { + val epoch = 0.toShort + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0) + checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1) + + idMapping.maybeTakeSnapshot() + val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(1L, 70000) + + // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence + // we should get an out of order sequence exception. + checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, 70001) + } + + + @Test + def testRemoveOldSnapshot(): Unit = { + val epoch = 0.toShort + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 1L) + checkAndUpdate(idMapping, pid, 1, epoch, 1L, 2L) + + idMapping.maybeTakeSnapshot() + + checkAndUpdate(idMapping, pid, 2, epoch, 2L, 3L) + + idMapping.maybeTakeSnapshot() + + assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}", + 1, idMappingDir.listFiles().length) + } + + @Test + def testSkipSnapshotIfOffsetUnchanged(): Unit = { + val epoch = 0.toShort + checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L) + + idMapping.maybeTakeSnapshot() + + // nothing changed so there should be no new snapshot + idMapping.maybeTakeSnapshot() + + assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}", + 1, idMappingDir.listFiles().length) + } + + @Test + def testStartOffset(): Unit = { + val epoch = 0.toShort + val pid2 = 2L + checkAndUpdate(idMapping, pid2, 0, epoch, 0L, 1L) + checkAndUpdate(idMapping, pid, 0, epoch, 1L, 2L) + checkAndUpdate(idMapping, pid, 1, epoch, 2L, 3L) + checkAndUpdate(idMapping, pid, 2, epoch, 3L, 4L) + idMapping.maybeTakeSnapshot() + + intercept[OutOfOrderSequenceException] { + val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(1L, time.milliseconds) + checkAndUpdate(recoveredMapping, pid2, 1, epoch, 4L, 5L) + } + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testPidExpirationTimeout() { + val epoch = 5.toShort + val sequence = 37 + checkAndUpdate(idMapping, pid, sequence, epoch, 1L) + time.sleep(maxPidExpirationMs + 1) + idMapping.checkForExpiredPids(time.milliseconds) + checkAndUpdate(idMapping, pid, sequence + 1, epoch, 1L) + } + + @Test + def testLoadPid() { + val epoch = 5.toShort + val sequence = 37 + val createTimeMs = time.milliseconds + idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), time.milliseconds) + checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L) + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testLoadIgnoresExpiredPids() { + val epoch = 5.toShort + val sequence = 37 + + val createTimeMs = time.milliseconds + time.sleep(maxPidExpirationMs + 1) + val loadTimeMs = time.milliseconds + idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), loadTimeMs) + + // entry wasn't loaded, so this should fail + checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L) + } + + private def checkAndUpdate(mapping: ProducerIdMapping, + pid: Long, + seq: Int, + epoch: Short, + lastOffset: Long, + timestamp: Long = time.milliseconds()): Unit = { + val numRecords = 1 + val incomingPidEntry = ProducerIdEntry(epoch, seq, lastOffset, numRecords, timestamp) + val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)) + producerAppendInfo.append(incomingPidEntry) + mapping.update(producerAppendInfo) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index b6e3607..f868032 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -201,6 +201,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { val start = System.currentTimeMillis() //Start the new broker (and hence start replicating) + debug("Starting new broker") brokers = brokers :+ createServer(fromProps(createBrokerConfig(101, zkConnect))) waitForOffsetsToMatch(msgCount, 0, 101) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 9ae7195..8766855 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -937,9 +937,10 @@ object TestUtils extends Logging { flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L, retentionCheckMs = 1000L, + maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, - brokerState = new BrokerState()) + brokerState = BrokerState()) } @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0") http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala index 918c4b5..db62020 100755 --- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala @@ -40,6 +40,21 @@ class ZkUtilsTest extends ZooKeeperTestHarness { assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0)) } + // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it + @Test + def testPersistentSequentialPath() { + // Given an existing path + zkUtils.createPersistentPath(path) + + var result = zkUtils.createSequentialPersistentPath(path + "/sequence_") + + assertEquals("/path/sequence_0000000000", result) + + result = zkUtils.createSequentialPersistentPath(path + "/sequence_") + + assertEquals("/path/sequence_0000000001", result) + } + @Test def testAbortedConditionalDeletePath() { // Given an existing path that gets updated http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 859e3c4..3cf3abd 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -15,25 +15,22 @@ import json import os -import signal import time from ducktape.services.background_thread import BackgroundThreadService from ducktape.cluster.remoteaccount import RemoteCommandError -from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.verifiable_client import VerifiableClientMixin from kafkatest.utils import is_int, is_int_with_prefix from kafkatest.version import DEV_BRANCH -from kafkatest.utils.remote_account import line_count class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): """This service wraps org.apache.kafka.tools.VerifiableProducer for use in - system testing. + system testing. NOTE: this class should be treated as a PUBLIC API. Downstream users use - this service both directly and through class extension, so care must be + this service both directly and through class extension, so care must be taken to ensure compatibility. """ @@ -59,7 +56,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, - stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO"): + stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False): """ :param max_messages is a number of messages to be produced per producer :param message_validator checks for an expected format of messages produced. There are @@ -92,6 +89,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.acks = acks self.stop_timeout_sec = stop_timeout_sec self.request_timeout_sec = request_timeout_sec + self.enable_idempotence = enable_idempotence def java_class_name(self): return "VerifiableProducer" @@ -124,6 +122,12 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou producer_prop_file += "\nacks=%s\n" % self.acks producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000) + if self.enable_idempotence: + self.logger.info("Setting up an idempotent producer") + producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n" + producer_prop_file += "\nretries=50\n" + producer_prop_file += "\nenable.idempotence=true\n" + self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/core/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index 3e17d56..5d96d7b 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -104,11 +104,12 @@ class ReplicationTest(ProduceConsumeValidateTest): self.topic = "test_topic" self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}} - }) + self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, + topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}} + }) self.producer_throughput = 1000 self.num_producers = 1 self.num_consumers = 1 @@ -123,6 +124,10 @@ class ReplicationTest(ProduceConsumeValidateTest): @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], + security_protocol=["PLAINTEXT"], + enable_idempotence=[True]) + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], + broker_type=["leader"], security_protocol=["PLAINTEXT", "SASL_SSL"]) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], @@ -133,7 +138,10 @@ class ReplicationTest(ProduceConsumeValidateTest): @parametrize(failure_mode="hard_bounce", broker_type="leader", security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") - def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"): + def test_replication_with_broker_failure(self, failure_mode, security_protocol, + broker_type, client_sasl_mechanism="GSSAPI", + interbroker_sasl_mechanism="GSSAPI", + enable_idempotence=False): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. @@ -152,8 +160,8 @@ class ReplicationTest(ProduceConsumeValidateTest): self.kafka.client_sasl_mechanism = client_sasl_mechanism self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) + self.enable_idempotence = enable_idempotence + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput, enable_idempotence=enable_idempotence) self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) self.kafka.start() - self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type)) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/produce_consume_validate.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index cad9150..079305c 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -42,6 +42,7 @@ class ProduceConsumeValidateTest(Test): # producer begins producing messages, in which case we will miss the # initial set of messages and get spurious test failures. self.consumer_init_timeout_sec = 0 + self.enable_idempotence = False def setup_producer_and_consumer(self): raise NotImplementedError("Subclasses should implement this") @@ -67,7 +68,7 @@ class ProduceConsumeValidateTest(Test): remaining_time = self.consumer_init_timeout_sec - (end - start) if remaining_time < 0 : remaining_time = 0 - if self.consumer.new_consumer is True: + if self.consumer.new_consumer: wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True, timeout_sec=remaining_time, err_msg="Consumer process took more than %d s to have partitions assigned" %\ @@ -167,9 +168,17 @@ class ProduceConsumeValidateTest(Test): msg = self.annotate_data_lost(data_lost, msg, len(to_validate)) + if self.enable_idempotence: + self.logger.info("Ran a test with idempotence enabled. We expect no duplicates") + else: + self.logger.info("Ran a test with idempotence disabled.") + # Are there duplicates? if len(set(consumed)) != len(consumed): - msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % abs(len(set(consumed)) - len(consumed)) + num_duplicates = abs(len(set(consumed)) - len(consumed)) + msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates + if self.enable_idempotence: + assert False, "Detected %s duplicates even though idempotence was enabled." % num_duplicates # Collect all logs if validation fails if not success:
