Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ab554caee -> 3c96f9128


KAFKA-6003; Accept appends on replicas and when rebuilding the log 
unconditionally

This is a port of #4004 for the 0.11.0 branch.

With this patch so that we _only_ validate appends which originate
from the client. In general, once the append is validated and written to
the leader the first time, revalidating it is undesirable since we can't
do anything if validation fails, and also because it is hard to maintain
the correct assumptions during validation, leading to spurious
validation failures.

For example, when we have compacted topics, it is possible for batches
to be compacted on the follower but not on the leader. This case would
also lead to an OutOfOrderSequencException during replication. The same
applies to when we rebuild state from compacted topics: we would get
gaps in the sequence numbers, causing the OutOfOrderSequence.

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4020 from apurvam/KAKFA-6003-0.11.0-handle-unknown-producer-on-replica


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c96f912
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c96f912
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c96f912

Branch: refs/heads/0.11.0
Commit: 3c96f9128af8ddc5d50c64af380827f7a0038287
Parents: ab554ca
Author: Apurva Mehta <apu...@confluent.io>
Authored: Mon Oct 9 10:34:43 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 9 10:34:43 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  8 +--
 core/src/main/scala/kafka/log/LogSegment.scala  |  2 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 25 ++++----
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 15 +++--
 .../kafka/log/ProducerStateManagerTest.scala    | 62 +++++++++++++++++---
 6 files changed, 84 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index f8b5d82..170e9cb 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -501,7 +501,7 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, 
loadingFromLog = true)
+        val maybeCompletedTxn = updateProducers(batch, loadedProducers, 
isFromClient = false)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -762,7 +762,7 @@ class Log(@volatile var dir: File,
       if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
         return (updatedProducers, completedTxns.toList, maybeLastEntry)
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, 
loadingFromLog = false)
+      val maybeCompletedTxn = updateProducers(batch, updatedProducers, 
isFromClient = isFromClient)
       maybeCompletedTxn.foreach(completedTxns += _)
     }
     (updatedProducers, completedTxns.toList, None)
@@ -849,9 +849,9 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
-                              loadingFromLog: Boolean): Option[CompletedTxn] = 
{
+                              isFromClient: Boolean): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, 
producerStateManager.prepareUpdate(producerId, loadingFromLog))
+    val appendInfo = producers.getOrElseUpdate(producerId, 
producerStateManager.prepareUpdate(producerId, isFromClient))
     appendInfo.append(batch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 4e0834c..53eafa1 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -147,7 +147,7 @@ class LogSegment(val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, 
batch: RecordBatch): Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, 
loadingFromLog = true)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, 
isFromClient = false)
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7a1962a..24530da 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -91,16 +91,17 @@ private[log] case class ProducerIdEntry(producerId: Long, 
producerEpoch: Short,
  *                                of this is the consumer offsets topic which 
uses producer ids from incoming
  *                                TxnOffsetCommit, but has no sequence number 
to validate and does not depend
  *                                on the deduplication which sequence numbers 
provide.
- * @param loadingFromLog This parameter indicates whether the new append is 
being loaded directly from the log.
- *                       This is used to repopulate producer state when the 
broker is initialized. The only
- *                       difference in behavior is that we do not validate the 
sequence number of the first append
- *                       since we may have lost previous sequence numbers when 
segments were removed due to log
- *                       retention enforcement.
+ * @param isFromClient The parameter indicates whether the write is coming 
from a client or not. If it is not coming
+ *                     from a client, it could be due to replication traffic, 
or when rebuilding producer state on
+ *                     from the log. In the latter two cases, we should not 
validate the append, but accept the
+ *                     incoming append unconditionally. This is for two 
reasons: first, the write was already
+ *                     validated when received from the client. Second, the 
data is already the log, so it is not
+ *                     clear what would be achieved by validating it again.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       initialEntry: ProducerIdEntry,
                                       validateSequenceNumbers: Boolean,
-                                      loadingFromLog: Boolean) {
+                                      isFromClient: Boolean) {
   private var producerEpoch = initialEntry.producerEpoch
   private var firstSeq = initialEntry.firstSeq
   private var lastSeq = initialEntry.lastSeq
@@ -108,6 +109,7 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   private var maxTimestamp = initialEntry.timestamp
   private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
+
   private val transactions = ListBuffer.empty[TxnMetadata]
 
   private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: 
Int) = {
@@ -161,9 +163,10 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              lastTimestamp: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
-      // skip validation if this is the first entry when loading from the log. 
Log retention
-      // will generally have removed the beginning entries from each producer 
id
+    if (isFromClient)
+      // We should only validate appends coming from the client. In 
particular, this means that we don't validate
+      // appends for sequence numbers and epochs when building producer state 
from the log or for writes on a replica.
+      // So validation only happens on the first write from the client to the 
partition leader.
       validateAppend(epoch, firstSeq, lastSeq)
 
     this.producerEpoch = epoch
@@ -507,9 +510,9 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, loadingFromLog: Boolean): 
ProducerAppendInfo =
+  def prepareUpdate(producerId: Long, isFromClient: Boolean): 
ProducerAppendInfo =
     new ProducerAppendInfo(producerId, 
lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers,
-      loadingFromLog)
+      isFromClient)
 
   /**
    * Update the mapping with the given append information

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 79fe220..6ef5e19 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -274,7 +274,7 @@ class LogSegmentTest {
     val segment = createSegment(100)
     val producerEpoch = 0.toShort
     val partitionLeaderEpoch = 15
-    val sequence = 0
+    val sequence = 100
 
     val pid1 = 5L
     val pid2 = 10L

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3c80bff..d2d1d6c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -891,22 +891,27 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test
   def testDuplicateAppendToFollower() : Unit = {
     val log = createLog(1024*1024)
     val epoch: Short = 0
     val pid = 1L
     val baseSequence = 0
     val partitionLeaderEpoch = 0
+    // The point of this test is to ensure that validation isn't performed on 
the follower.
     // this is a bit contrived. to trigger the duplicate case for a follower 
append, we have to append
     // a batch with matching sequence numbers, but valid increasing offsets
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, 
CompressionType.NONE, pid, epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, 
CompressionType.NONE, pid, epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
+
+    // Ensure that even the duplicate sequences are accepted on the follower.
+    assertEquals(4L, log.logEndOffset)
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val log = createLog(1024*1024)
 
@@ -950,9 +955,11 @@ class LogTest {
 
     val records = MemoryRecords.readableRecords(buffer)
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+
+    // Ensure that batches with duplicates are accepted on the follower.
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(records)
-    // Should throw a duplicate sequence exception here.
-    fail("should have thrown a DuplicateSequenceNumberException.")
+    assertEquals(5L, log.logEndOffset)
   }
 
   @Test(expected = classOf[ProducerFencedException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9a324aa..3c7c07b 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog 
= true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = 
false)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -104,7 +104,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog 
= true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = 
false)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -113,7 +113,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog 
= true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = 
false)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -159,7 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(producerId, 
ProducerIdEntry.Empty, validateSequenceNumbers = true,
-      loadingFromLog = false)
+      isFromClient = true)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, 
segmentBaseOffset = 990000L,
@@ -176,7 +176,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(producerId, 
ProducerIdEntry.Empty, validateSequenceNumbers = true,
-      loadingFromLog = false)
+      isFromClient = true)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true)
 
     // use some other offset to simulate a follower append where the log 
offset metadata won't typically
@@ -196,7 +196,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = 
false)
+    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = 
true)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, 
isTransactional = true)
     var lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
@@ -319,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testAcceptAppendWithoutProducerStateOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    append(stateManager, producerId, epoch, 1, 1L, 1)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, 
maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 1L, 70000)
+
+    val sequence = 2
+    // entry added after recovery. The pid should be expired now, and would 
not exist in the pid mapping. Nonetheless
+    // the append on a replica should be accepted with the local producer 
state updated to the appended value.
+    assertFalse(recoveredMapping.activeProducers.contains(producerId))
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, 
isFromClient = false)
+    assertTrue(recoveredMapping.activeProducers.contains(producerId))
+    val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head
+    assertEquals(epoch, producerIdEntry.producerEpoch)
+    assertEquals(sequence, producerIdEntry.firstSeq)
+    assertEquals(sequence, producerIdEntry.lastSeq)
+  }
+
+  @Test
+  def testAcceptAppendWithSequenceGapsOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    val outOfOrderSequence = 3
+
+    // First we ensure that we raise an OutOfOrderSequenceException is raised 
when the append comes from a client.
+    try {
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, 
isFromClient = true)
+      fail("Expected an OutOfOrderSequenceException to be raised.")
+    } catch {
+      case _ : OutOfOrderSequenceException =>
+      // Good!
+      case _ : Exception =>
+        fail("Expected an OutOfOrderSequenceException to be raised.")
+    }
+
+    assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, 
isFromClient = false)
+    assertEquals(outOfOrderSequence, 
stateManager.activeProducers(producerId).lastSeq)
+  }
+
+  @Test
   def testDeleteSnapshotsBefore(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L)
@@ -673,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): 
(CompletedTxn, Long) = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, 
loadingFromLog = false)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, 
isFromClient = true)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -689,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isLoadingFromLog: Boolean = false): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, 
isLoadingFromLog)
+                     isFromClient : Boolean = true): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, 
isFromClient)
     producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, 
isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)

Reply via email to